これ の通り、Rx には本来の目的のみを遂行してもらいたいので考えみました。
下記に書いてある以上の説明はないです(圧倒的感謝っ)
Channel<T>
と BroadcastChannel<T>
Channel
は「キューのようなもの」と例えられます、BroadcastChannel
も同じく。
Channel<T>
は、送信者と受信者が 1:1 で、
BroadcastChannel<T>
は、送信者と受信者が 1:n です。 Broadcast と言われるように。
両方使ってみます。
// Channel の例
val c = ArrayChannel<String>(2) // capacity=2 は具体的に何に効くのか不明…
launch(CommonPool) {
// delay(2000) // キューに追加されてから受信してもok
c.consumeEach {
Log.d("KT","${it}")
}
}
launch(UI) {
c.send("a")
c.send("b")
c.send("c")
}
// BroadcastChannel の例
launch(UI) {
val bc = ArrayBroadcastChannel<String>(10)
bc.send("A")
bc.send("B")
// 受信者1
launch(newSingleThreadContext("threadA")) {
val subscription1: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription1.consumeEach {
Log.d("KT-BROADCAST-1","${it}")
}
// ここに何か書いても実行されないよん
}
// 受信者2
launch(newSingleThreadContext("threadB")) {
val subscription2: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription2.consumeEach {
Log.d("KT-BROADCAST-2","${it}")
}
// ここに何か書いても実行されないよん
}
delay(2000) // 受信側のために少し待ってから
bc.send("C")
bc.send("D")
}
上記の出力:
KT a
b
c
KT-BROADCAST-1 C
KT-BROADCAST-2 C
D
KT-BROADCAST-1 D
Channel
は send
されたものを consumeEach { }
で受信します。1:1 です。
ここで使っている ArrayChannel
は単純なキューなので、send が先、あとから consume でも問題なくすべて受信できます(capacity=2
は??)。
一方 BroadcastChannel
は、. openSubscription()
を呼ぶことで複数の受信者を持てます。受信者1と2が、それぞれ同じ値を受信できることがわかるでしょう。
ArrayBroadcastChannel
では、受信登録前に追加した "A", "B"
が受信されていません。
これは BroadcastChannel というよりは ArrayBroadcastChannel
の特性で、 受信者が誰も居ない状態で send()
された値はそのまま捨てられます。
send()
と offer()
どちらも「キューに要素を追加する」という役割ですが、次の違いがあります。
launch(xx) { }
)でしか使えないArrayCannel
では追加できてしまった)false
が返却される)参考 − Channel - kotlinx-coroutines-core
BroadcastChannel には ArrayBroadcastChannel
の他に ConflatedBroadcastChannel
というものがあります。
これは、 最近 send()
された値をひとつだけキャッシュしておく BroadcastChannel です、どっかで聞いたことありますね。
launch(UI) {
val bc = ConflatedBroadcastChannel<String>()
bc.send("A")
bc.send("B")
// 受信者1
launch(newSingleThreadContext("threadA")) {
val subscription1: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription1.consumeEach {
Log.d("KT-BROADCAST-1","${it}")
}
// ここに何か書いても実行されないよん
}
// 受信者2
launch(newSingleThreadContext("threadB")) {
val subscription2: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription2.consumeEach {
Log.d("KT-BROADCAST-2","${it}")
}
// ここに何か書いても実行されないよん
}
delay(2000)
bc.send("C")
bc.send("D")
Log.d("KT-BROADCAST-LATEST","${bc.valueOrNull}")
}
上記の出力:
KT-BROADCAST-1 B
KT-BROADCAST-2 B
KT-BROADCAST-1 C
KT-BROADCAST-2 C
D
KT-BROADCAST-LATEST D
KT-BROADCAST-1 D
上記のように、受信登録前に send
された A, B
の内、最後に送信された B
が受信できています。
また、最後に送信された値を .valueOrNull
プロパティで取り出すことができます。
さてさて、 Channel と BroadcastChannel を次のようにまとめてみました。
次に Rx の Observable/Subject、Hot/Cold について次のようにまとめます(Observable/Flowable/Single/Completable
などの違いについては述べません)。
onNext
された値を subscriber に配信する(subscriber が居なければ無視される)onNext
された最新の値をキャッシュして新しい subscriber に配信するそうすると、Channel と Rx は次のように対比できると考えます。
具体的には、
Observable<T>
を使っている → Channel<T>
に替えるObservable<T>
(実体は Subject<T>
) を用意している → BroadcastChannel<T>
に替えることができると思います。おまけですが、
Single<T>
や Completable
を使っている → Continuation<T>
に替えるも。
適当にリポジトリクラスを書いてみると、こんな感じかなと。
// 「アドレス帳」 のリポジトリクラスのインターフェース
interface AddressRepository {
// アドレスリスト帳を通知する(1件のAddressの変更通知をするものなら完璧)
val addresses: BroadcastChannel<Array<Address>>
// なんかの条件でアドレス帳を非同期で検索して結果を返す
fun filter(criteria: String): Channel<Array<Address>>
// 特定の名前の人がアドレス帳に存在するかを非同期で調べて居るなら true を返す
suspend fun exists(name: String): Boolean
}
MV* の「つなぎ」に RxJava を使うのをやめたい からは少し前進できたかなと。もっと勉強が必要です。