MV* の「つなぎ」に RxJava を使うのをやめたい

ここ数年、特にモバイルアプリ開発で流行ってるUIデザインパターンならなんでもですが、MVVM を例にすると、Usecase における Repository からの結果の受信、ViewModel における Usecase からの通知、あるいは View の変更の通知に RxJava の Observable<T> を使用する例は多いと思います(かくいう自分もそう作ってきました)。

ここ数年、特にモバイルアプリ開発で流行ってるUIデザインパターンならなんでもですが、MVVM を例にすると、Usecase における Repository からの結果の受信、ViewModel における Usecase からの通知、あるいは View の変更の通知に RxJava の Observable<T> を使用する例は多いと思います(かくいう自分もそう作ってきました)。

DroidKaigi 2018 のアプリもそうですね。

image.png

via DroidKaigi 2018 official Android app

しかし最近、この「つなぎ」の役割に RxJava を使うのはやり過ぎでは?と思うようになっています。その理由を次に書きます。

RxJava を使うのをやめたい理由

1. Rx は、できることが多すぎる

RxJava の学習コストが高いことは知られています。 つなぎの型が Observable<T> であるだけで、多くの機能が使えてしまい、利用者(=開発者)を混乱させるでしょう。

Rx の真髄は豊富な Operator を組合せて、ストリーミングデータをエレガントに扱うことなので、たた単に「pub して sub するだけ」ならオレの出番じゃねぇよ、なんですよね。

2. 依存モジュールは最小にすべきだ

「つなぎ」に Observable<T> を使うことで、つながっている両者は RxJava に依存してしまいます。

Google I/O 2018 で発表された Android App Bundle では、機能をモジュール化して、モジュール毎の(オンデマンドな)動的配信が可能になります。1

モジュール化するにあたり、各モジュールが依存するモジュールはできるだけ最小にすべきで、「つなぎ」の役割のためだけに決して小さくはない RxJava に依存することには疑問があります。

3. Java に対する不安、Pure Kotlin への期待

の通り、今後の "疑似Java" の使用には幾ばくかの不安があります。

一方で、もはや Android アプリ開発の標準言語となった Kotlin は、言語自体は JVM が必須ではありません。例えば Kotlinマルチプラットフォームプロジェクト では、複数プラットフォームで使いまわせる「共有モジュール」は Kotlin の標準APIsのみを使って開発します。

RxJava はその名の通り、Java向けのライブラリであり JVM に依存します。

不安のあるJava成分を削除し、来るべきマルチプラットフォーム時代のためにモジュールを共通化しようと考えたら、JVM に依存している RxJava は足枷になるはずです(Pure Kotlin で書かれた Rx-Kotlin2 には期待したいですがそれは別な話で)。

とはいえ一番マッチしたのが RxJava だったんです

Java や Android の API にいわゆる Promise/Future や、ストリームを扱う共通インターフェースがなかったところに、「Rx(RxJava)なんか便利だぞ」って流行りだして、そのまま便利に使われちゃってる、というのが現状だと思います(自分も Android で Rx を使い始めたきっかけは Promise/Future の代わりでした)。

それはそれでベストな選択だったわけで何も間違ってはいないです。 ただ今はもっとベストな選択があるんじゃないか?と。

RxJava を Kotlin Coroutines に置き換えたら良いんじゃないかな?

前述の通り Kotlin はもはや Android アプリ開発では必須ですし、もしかしたら私の知っている以上にサーバーサイドでも使われているのかも知れません。

Kotlin には Coroutines という、非同期に特化したAPIセットがあります。まだ experimental(実験段階)だけど、プロダクトにぶっこんでる人も多いのではないかと。

そしてこの記事

では、RxJava と Coroutines の対比が解説されているではありませんか。

代表的なものにしぼって Rx → Coroutine の対応を挙げると、

  • 1件だけ値を受信する Single<T>Continuation<T>
  • 完了したかのみを受信する CompletableContinuation<Unit>
  • 複数の値を通知する Subject<T>SendChannel<T>
  • 複数の値を受信する Observable<T>/Flowable<T>ReceiveChannel<T>
  • 処理を実行する疑似スレッド SchedulerCoroutineContext

となります。 このように置き換えれば RxJava への依存は切ることができそうです(ただし現在の Coroutines は JVM に依存してるみたいなので共通モジュールでは使えなさそう:cry:)。

RxJava は局所的に、本当に必要な場所だけで使おう

RxJava が完全に不要になるかといえばそうでもなく、そのオペレータはやっぱり便利です。

例えば、

とか。

これら「RxJava でしかできない機能」が必要なら使うべきで、ただし「局所的に」するのがよいと思います。 DDD よく知らないけどカッコつけて言うなら「Cohesive Mechanisms(凝集されたメカニズム)パターン」でしょうか、Rx は What じゃなくて How の領域なのでそこだけ分離する、と。

幸い、

を使うと、 RxJava2 と Kotlin Coroutines の相互変更ができるようです。つなぎは Continuation<T>ReceiveChannel<T> を使い、必要な箇所で Single<T>Flowable<T> に変換して使えばよさそうです。

実際にやってみた

冒頭の DroidKaigi 2018 のアプリから RxJava を追い出して、代わりに Kotlin Coroutine を使ってみました。はじめは「DroidKaigiApp から Rx 全部抜く!」の意気込みで取り掛かりましたが、意外と RxJava にガッツリ依存していたのであきらめ :sweat_smile: 、一つの画面だけやってみました。

イメージ的にはこんな感じです。

image.png

やってみたのはスタッフ一覧、NavDrawer → Staff で出てくる画面です。

image.png

この画面は StaffViewModelStaffDataRepository を使ってスタッフ一覧データを読み、それを画面に表示させています。

StaffDataRepository から RxJava を追い出す

こちらの修正前のソースが以下です。見やすさ向上のため関係のない一部のコードは省いています。

class StaffDataRepository @Inject constructor(
        private val context: Context,
        private val schedulerProvider: SchedulerProvider
) : StaffRepository {
    
    override fun loadStaff(): Completable = getStaff()
            .subscribeOn(schedulerProvider.io())
            .toCompletable()

    override val staff: Flowable<List<Staff>>
        get() = getStaff().toFlowable().subscribeOn(schedulerProvider.io())

    private fun getStaff(): Single<List<Staff>> {
        return Single.create { emitter ->
            try {
                val asset = LocalJsonParser.loadJsonFromAsset(context, "staff.json")
                emitter.onSuccess(StaffJsonMapper.mapToStaffList(asset))
            } catch (e: Exception) {
                Timber.e(e)
                emitter.onError(e)
            }
        }
    }
}

staff: Flowable<List<Staff>> が、読み出したスタッフリストを外部へ通知する Observable ですね。 そして loadStaff() が、読み出しを非同期で実行するメソッドです(これ自体も戻り値が Completable になっていますが、あまり関係ないので省略します)。

ところで上記のコード、初見でも違和感があって、ちゃんと調べたら問題を2つほど見つけたのですが、どこかわかるでしょうか? DroidKaigiApp の repo にプルリクを出してマージしてもらったので、気になる方はそちらを見てみてください。

さてここから RxJava をやめて代わりに Coroutine を使ってみたのが次です。

class StaffDataRepository @Inject constructor(
        private val context: Context,
        private val schedulerProvider: SchedulerProvider
) : StaffRepository {

    private val sender = ConflatedBroadcastChannel<List<Staff>>();
    override val staff: ReceiveChannel<List<Staff>> = sender.openSubscription()

    override fun loadStaff() {
        launch(CommonPool) {
            try {
                val asset = LocalJsonParser.loadJsonFromAsset(
                        this@StaffDataRepository.context, "staff.json")
                sender.offer(StaffJsonMapper.mapToStaffList(asset))
                yield()
            } catch (e: Exception) {
                Timber.e(e)
                sender.close(e)
            }
        }
    }
}

まず staff プロパティが ReceiveChannel<List<Staff>> になりました。 そしてそれは ConflatedBroadcastChannel である senderopenSubscription() して得ています。 sender : ConflatedBroadcastChannel は値を送信する側、staff: ReceiveChannel は値を受信するためだけのインターフェースです。これは Rx の Subject<T>Observable<T> に似ていますね。

loadStaff() は少し簡略化しました。 launch(CommonPool) { } で非同期処理を開始し、Json を読み出した後、sender.offer() でそれを送信します。 注目なのは、その次に yield() を呼び出している点で、これにより非同期処理を開始したコルーチンに処理を戻します。yield() を忘れると通知が受信できないので要注意です。

StaffViewModel から RxJava を追い出す

次は ViewModel ですね。StaffDataRepository からスタッフリストを受信してそれを LiveData<T> に変換します。LiveData はリストとデータバインドされているので一覧に表示される仕組みです。

class StaffViewModel @Inject constructor(
        private val repository: StaffRepository,
        private val schedulerProvider: SchedulerProvider
) : ViewModel(), LifecycleObserver {

    private val compositeDisposable: CompositeDisposable = CompositeDisposable()

    val staff: LiveData<Result<List<Staff>>> by lazy {
        repository.staff
                .toResult(schedulerProvider)
                .toLiveData()
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
    fun onCreate() {
        repository.loadStaff()
                .subscribeBy(onError = defaultErrorHandler())
                .addTo(compositeDisposable)
    }

    override fun onCleared() {
        super.onCleared()
        compositeDisposable.clear()
    }
}

staff: LiveData<> の宣言で StaffRepository.staff : Flowable<> を LiveData に変換しています、宣言だけで完結する、いいコードですね。

あとは画面の表示時である onCreateStaffRepository.loadStaff() を呼び出します。

で、こちらも RxJava をやめて代わりに Coroutine を使ってみたのが次です。

class StaffViewModel @Inject constructor(
        private val repository: StaffRepository,
        private val schedulerProvider: SchedulerProvider
) : ViewModel(), LifecycleObserver {

    private val compositeDisposable: CompositeDisposable = CompositeDisposable()

    val staff: LiveData<Result<List<Staff>>> by lazy {
        val liveData = MutableLiveData<Result<List<Staff>>>()

        launch(Unconfined) {
            liveData.postValue(Result.inProgress())
            repository.staff.consumeEach {
                liveData.postValue(Result.success(it))
            }
        }
        liveData
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
    fun onCreate() {
        repository.loadStaff()
    }

    override fun onCleared() {
        super.onCleared()
        compositeDisposable.clear()
    }
}

staff: LiveData<> の定義が少し長くなってしまいましたが、これは .toResult().toLiveData() の拡張関数に頼れなかったためです。

launch(Unconfined) { } で、呼び出し元と同じスレッドで処理を開始し、repository.staff.consumeEach { } で値を受信しつづけます。値を受信したら liveData.postValue(it) で値をViewに通知します。

この処理は ReceiveChannel<T>.toLiveData() な拡張関数が欲しいですね。

さて、これらの修正で StaffFragment - StaffViewModel - StaffDataRepository のラインでは RxJava を使わず Kotlin の Coroutine で完結させることができました。

修正前後の完全な差分は、

を見てください。

  • Channel を使うために kotlinx-coroutines-core を追加
  • ConflatedBroadcastChannel でデータを送信するとき、エラーが発生したら .close(throwable) を呼ぶ
  • StaffDataRepository クラスは StaffRepository インターフェースの実装なので、StaffRepository も修正

などをしています。

疑問

これでよいんだろか?と思う点、いくつもあります。

sender.offer() の後、 yield() を呼ぶしかないのか

通知を受信するために yield() が必須!とは言ったものの、絶対忘れそう…。 あと「スレッドを呼び出し元に戻す」ことで受信が可能になるという仕組みもなんだかハマりそうな予感。

SendChannel.openSubscription() したら、誰が・いつ Close するの?

SendChannel.openSubscription() で得られる ReceiverChannel には close メソッドがあります。 StaffDataRepository で Open したんだから、StaffDataRepository で Close すべき? なら StaffDataRepository は Disposable であるべき?

CoroutineContext の扱い

launch(xxx) { } の xxx に与える CoroutineContext、Rx では Scheduler にあたるわけですが、これはアプリ全体で統一感を持たせて管理すべきでしょう。修正前のコードでは SchedulerProviderui/computation/io などが用意されていました。同じようにアプリ UI/計算実行用/IO処理 など個別に CoroutineContext を用意して、SchedulerProvider に持たせるとよいのかな、と思います。 すべて CommonPool に頼るとどこかで衝突・デッドロックが発生しそうです。

おわり

とりあえずこんな感じで、次に Android アプリをスクラッチで開発するときには、つなぎに RxJava を使わない方向でやってみようかなーと思っています。

識者のコメント、お待ちしております。 :bow_tone3:


  1. @red_fat_daruma さんによると、App Bundle はマルチモジュールとは違う性質を持つようです、ちょっとこの理由にはそぐわないのかもしれません。
  2. @shiraj_i さんのいわれる通り、 RxKotlin というライブラリは既にあって、それは単なるRxJavaのKotlin向け拡張です。