Experiments Never Fail

Kotlin の Channel と Rx の Observable/Subject の対比

これ の通り、Rx には本来の目的のみを遂行してもらいたいので考えみました。

まずはこれを読もう #

下記に書いてある以上の説明はないです(圧倒的感謝っ)

Kotlin の Channel<T>BroadcastChannel<T> #

Channel は「キューのようなもの」と例えられます、BroadcastChannel も同じく。

Channel<T> は、送信者と受信者が 1:1 で、
BroadcastChannel<T> は、送信者と受信者が 1:n です。 Broadcast と言われるように。

両方使ってみます。

// Channel の例
val c = ArrayChannel<String>(2) // capacity=2 は具体的に何に効くのか不明…
launch(CommonPool) {
// delay(2000) // キューに追加されてから受信してもok
c.consumeEach {
Log.d("KT","${it}")
}
}

launch(UI) {
c.send("a")
c.send("b")
c.send("c")
}

// BroadcastChannel の例
launch(UI) {
val bc = ArrayBroadcastChannel<String>(10)
bc.send("A")
bc.send("B")

// 受信者1
launch(newSingleThreadContext("threadA")) {
val subscription1: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription1.consumeEach {
Log.d("KT-BROADCAST-1","${it}")
}
// ここに何か書いても実行されないよん
}

// 受信者2
launch(newSingleThreadContext("threadB")) {
val subscription2: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription2.consumeEach {
Log.d("KT-BROADCAST-2","${it}")
}
// ここに何か書いても実行されないよん
}

delay(2000) // 受信側のために少し待ってから
bc.send("C")
bc.send("D")
}

上記の出力:

KT a
   b
   c

KT-BROADCAST-1  C
KT-BROADCAST-2  C
                D
KT-BROADCAST-1  D

Channelsend されたものを consumeEach { } で受信します。1:1 です。
ここで使っている ArrayChannel は単純なキューなので、send が先、あとから consume でも問題なくすべて受信できます(capacity=2 は??)。

一方 BroadcastChannel は、. openSubscription() を呼ぶことで複数の受信者を持てます。受信者1と2が、それぞれ同じ値を受信できることがわかるでしょう。

ArrayBroadcastChannel では、受信登録前に追加した "A", "B" が受信されていません。
これは BroadcastChannel というよりは ArrayBroadcastChannel の特性で、 受信者が誰も居ない状態で send() された値はそのまま捨てられます。

Channel. send()offer() #

どちらも「キューに要素を追加する」という役割ですが、次の違いがあります。

send() #

offer() #

参考 − Channel - kotlinx-coroutines-core

ArrayBroadcastChannel と ConflatedBroadcastChannel #

BroadcastChannel には ArrayBroadcastChannel の他に ConflatedBroadcastChannel というものがあります。

これは、 最近 send() された値をひとつだけキャッシュしておく BroadcastChannel です、どっかで聞いたことありますね。

launch(UI) {
val bc = ConflatedBroadcastChannel<String>()
bc.send("A")
bc.send("B")

// 受信者1
launch(newSingleThreadContext("threadA")) {
val subscription1: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription1.consumeEach {
Log.d("KT-BROADCAST-1","${it}")
}
// ここに何か書いても実行されないよん
}

// 受信者2
launch(newSingleThreadContext("threadB")) {
val subscription2: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription2.consumeEach {
Log.d("KT-BROADCAST-2","${it}")
}
// ここに何か書いても実行されないよん
}

delay(2000)
bc.send("C")
bc.send("D")

Log.d("KT-BROADCAST-LATEST","${bc.valueOrNull}")
}

上記の出力:

KT-BROADCAST-1  B
KT-BROADCAST-2  B
KT-BROADCAST-1  C
KT-BROADCAST-2  C
                D
KT-BROADCAST-LATEST  D
KT-BROADCAST-1  D

上記のように、受信登録前に send された A, B の内、最後に送信された B が受信できています。
また、最後に送信された値を .valueOrNull プロパティで取り出すことができます。

Rx との対比 #

さてさて、 Channel と BroadcastChannel を次のようにまとめてみました。

次に Rx の Observable/Subject、Hot/Cold について次のようにまとめます(Observable/Flowable/Single/Completable などの違いについては述べません)。

そうすると、Channel と Rx は次のように対比できると考えます。

[^1]: Hot Observable と ConnectableObservable について - Qiita

具体的には、

ことができると思います。おまけですが、

も。

適当にリポジトリクラスを書いてみると、こんな感じかなと。

// 「アドレス帳」 のリポジトリクラスのインターフェース
interface AddressRepository {
// アドレスリスト帳を通知する(1件のAddressの変更通知をするものなら完璧)
val addresses: BroadcastChannel<Array<Address>>

// なんかの条件でアドレス帳を非同期で検索して結果を返す
fun filter(criteria: String): Channel<Array<Address>>

// 特定の名前の人がアドレス帳に存在するかを非同期で調べて居るなら true を返す
suspend fun exists(name: String): Boolean
}

MV* の「つなぎ」に RxJava を使うのをやめたい からは少し前進できたかなと。もっと勉強が必要です。

published at tags: RxJava Kotlin