これは RxJava Advent Calendar 2015 17日目 の記事です。
空いてたので参加してみました。 普段は Xamarin(C#) + Reactive Extensions + ReactiveProperty で、Reactive + MVVM な Android/iOS両対応アプリを開発しています。
Cold Observable は「あなただけの」Stream、Hot は「みんなの」Stream 。 (私的にはニコ動かニコ生か、みたいに理解してますが、その話はいいや)
Cold は、あなたが subscribe した瞬間からデータが流れ始めます。 Hot は、あなたが subscribe してもデータは流れ始めません(流れるかも知れません?)。
では Hot Observable はいつからデータが流れ始める?Observable が生成された瞬間から? その答え(の一つ)が ConnectableObservable 。
Cold Observable を Hot化する publish メソッドの返り値は ConnectableObservable。 Hot は必ず ConnectableObservable。(←これ後で否定します)
ConnectableObservable には connect メソッドがあります。 Hot Observable のデータが流れ始めるのは、このメソッドを呼んだ瞬間から。 なので、どれだけ subscriber が居ようとも connect を呼ばなければデータは流れません。逆に subscriber が居なくても connect を呼べばデータが流れ始めます。
connect メソッドの返り値は Subscription です。 Subscription の unsubscribe メソッドを呼ぶと、データの放流が停止します。これも subscriber が居ようが居まいが停止します。 再度 connect すると、 最初から データが流れ出します。再開ではありません。
Observable.interval は、一定時間置きにインクリメントされた値を流す Cold Observable。 なので、複数の subscriber が居たら、各々に独立した値を流します。
Android の画面にボタンが2つ(buttonSubscribe1 と buttonSubscribe2)並んでるだけのサンプルです。
final Observable<Long> tickObservable = Observable.interval(1000, TimeUnit.MILLISECONDS);
// 可視性向上の為のなんちゃってラムダ
findViewById(R.id.buttonSubscribe1).setOnClickListener(v -> {
Log.d(TAG, "buttonSubscribe1 click!");
tickObservable.subscribe(x -> Log.d(TAG, "subscriber1 - onNext - " + x));
});
findViewById(R.id.buttonSubscribe2).setOnClickListener(v -> {
Log.d(TAG, "buttonSubscribe2 click!");
tickObservable.subscribe(x -> Log.d(TAG, "subscriber2 - onNext - " + x));
});
結果:
D/MainActivity: buttonSubscribe1 click! D/MainActivity: subscriber1 - onNext - 0 D/MainActivity: subscriber1 - onNext - 1 D/MainActivity: subscriber1 - onNext - 2 D/MainActivity: subscriber1 - onNext - 3 D/MainActivity: subscriber1 - onNext - 4 D/MainActivity: subscriber1 - onNext - 5 D/MainActivity: buttonSubscribe2 click! D/MainActivity: subscriber1 - onNext - 6 D/MainActivity: subscriber2 - onNext - 0 D/MainActivity: subscriber1 - onNext - 7 D/MainActivity: subscriber2 - onNext - 1 D/MainActivity: subscriber1 - onNext - 8 D/MainActivity: subscriber2 - onNext - 2
buttonSubscribe1 を押すとデータ(0から連番)が流れ始めます。 しばらくして buttonSubscribe2 を押すと、1 とは関係なく、また 0 から流れ始めます。
publish で Hot 化します。 connect と unsubscribe を呼ぶためのボタン(buttonConnect, buttonDisConnect)を画面に追加してます。
private Subscription _connection; // field です
final ConnectableObservable<Long> tickObservable =
Observable.interval(1000, TimeUnit.MILLISECONDS).publish(); // publish で Hot化
findViewById(R.id.buttonSubscribe1).setOnClickListener(v -> {
Log.d(TAG, "buttonSubscribe1 click!");
tickObservable.subscribe(x -> Log.d(TAG, "subscriber1 - onNext - " + x));
});
findViewById(R.id.buttonSubscribe2).setOnClickListener(v -> {
Log.d(TAG, "buttonSubscribe2 click!");
tickObservable.subscribe(x -> Log.d(TAG, "subscriber2 - onNext - " + x));
});
findViewById(R.id.buttonConnect).setOnClickListener(v -> {
Log.d(TAG, "buttonConnect click!");
_connection = tickObservable.connect(); // データ放流開始
});
findViewById(R.id.buttonDisConnect).setOnClickListener(v -> {
Log.d(TAG, "buttonDisConnect click!");
if (_connection != null) {
_connection.unsubscribe(); // データ放流停止
_connection = null;
}
});
結果:
D/MainActivity: buttonSubscribe1 click! D/MainActivity: buttonConnect click! // ←数秒経過している D/MainActivity: subscriber1 - onNext - 0 D/MainActivity: subscriber1 - onNext - 1 D/MainActivity: subscriber1 - onNext - 2 D/MainActivity: subscriber1 - onNext - 3 D/MainActivity: buttonSubscribe2 click! D/MainActivity: subscriber1 - onNext - 4 D/MainActivity: subscriber2 - onNext - 4 D/MainActivity: subscriber1 - onNext - 5 D/MainActivity: subscriber2 - onNext - 5 D/MainActivity: subscriber1 - onNext - 6 D/MainActivity: subscriber2 - onNext - 6 D/MainActivity: buttonDisConnect click! -これ以降 onNext は出力されない-
buttonSubscribe1 を押しても、まだデータは流れてきません。 数秒後、buttonConnect を押すとデータが流れ始めます。 buttonSubscribe2 を押すと、subscriber2 が増えますが、Hot(みんなの)Observable なので、流れてくる値とタイミングは subscriber1 と全く同じです。
buttonDisConnect を押すと、データの放流が停止されます。(ちなみにもう一度 CONNECT すると、また 0 から値が流れます) subscriber1, subscriber2 にはもう onNext は呼ばれません。
※サンプルでは onNext しか受信していませんが、 buttonDisConnect を押しても、 subscriber1, subscriber2 の onComplete や onError も呼ばれません。つまり、 「データの放流が停止されても、 subscriber はそれに気付けない」 ということになります。これはこれでいいんだろか、という感じです。
Hot は必ず ConnectableObservable。(←これ後で否定します)
否定始めます。
ConnectableObservable では、データ放流の開始と停止は、 connect と unsubscribe に委ねられていました。
refCount() を使うとそれを自動化できます。(refCount?参照カウントを返すメソッド?そう思っていましたが全然違いました。) どういうことかと言うと、最初の subscriber が現れたらデータ放流を開始し、誰も subscriber が居なくなったら放流を停止する、というものです。 refCount() の返値はただの Observable です、でも Hot です。はい否定しましたー。
publish した Hot Observable を refCount してデータ放流を自動制御してもらいます。 画面には、 buttonConnect, buttonDisConnect に代わり、buttonUnsubscribe1, buttonUnsubscribe2 を用意します。
private Subscription _subscription1; // field です
private Subscription _subscription2; // field です
private Subscription _connection; // field です
final Observable<Long> tickObservable =
Observable.interval(1000, TimeUnit.MILLISECONDS).publish().refCount(); // 返値は Connectable ではない
findViewById(R.id.buttonSubscribe1).setOnClickListener(v -> {
Log.d(TAG, "buttonSubscribe1 click!");
_subscription1 = tickObservable.subscribe(x -> Log.d(TAG, "subscriber1 - onNext - " + x));
});
findViewById(R.id.buttonSubscribe2).setOnClickListener(v -> {
Log.d(TAG, "buttonSubscribe2 click!");
_subscription2 = tickObservable.subscribe(x -> Log.d(TAG, "subscriber2 - onNext - " + x));
});
findViewById(R.id.buttonUnsubscribe1).setOnClickListener(v -> {
Log.d(TAG, "buttonUnsubscribe1 click!");
if (_subscription1 != null) {
_subscription1.unsubscribe(); // 1購読終了
_subscription1 = null;
}
});
findViewById(R.id.buttonUnsubscribe2).setOnClickListener(v -> {
Log.d(TAG, "buttonUnsubscribe2 click!");
if (_subscription2 != null) {
_subscription2.unsubscribe(); // 2購読終了
_subscription2 = null;
}
});
結果:
D/MainActivity: buttonSubscribe1 click! D/MainActivity: subscriber1 - onNext - 0 D/MainActivity: subscriber1 - onNext - 1 D/MainActivity: subscriber1 - onNext - 2 D/MainActivity: buttonSubscribe2 click! D/MainActivity: subscriber1 - onNext - 3 D/MainActivity: subscriber2 - onNext - 3 D/MainActivity: subscriber1 - onNext - 4 D/MainActivity: subscriber2 - onNext - 4 D/MainActivity: subscriber1 - onNext - 5 D/MainActivity: subscriber2 - onNext - 5 D/MainActivity: subscriber1 - onNext - 6 D/MainActivity: subscriber2 - onNext - 6 D/MainActivity: subscriber1 - onNext - 7 D/MainActivity: subscriber2 - onNext - 7 D/MainActivity: buttonUnsubscribe1 click! D/MainActivity: subscriber2 - onNext - 8 D/MainActivity: subscriber2 - onNext - 9 D/MainActivity: subscriber2 - onNext - 10 D/MainActivity: subscriber2 - onNext - 11 D/MainActivity: buttonUnsubscribe2 click! -これ以降 onNext は出力されない-
buttonSubscribe1 を押すと、その時点でデータが流れ始めます(refCount による自動制御)。 buttonSubscribe2 を押すと、subscriber1 と同じタイミングで、同じ値を受信できます(Hot だから)。 buttonUnsubscribe1 を押すと、 subscriber1 は購読をやめますが、subscriber2 はまだ受信しています。 buttonUnsubscribe2 を押すと、subscriber2 も購読をやめ、この時点でデータ放流が停止します(refCount による自動制御)。
※ほんとにデータ放流終わってんの?を確認するには、 tickObservable に doOnNext を繋げて確認するとよいと思います。
Hot Observable は、ほとんどの場合(publish により生成されるので) ConnectableObservable。 ConnectableObservable は、購読者の有無に関係なく connect でデータ放流開始、Subscription.unsubscribe でデータ放流停止。 refCount により購読者の有無に連動したデータ放流の自動制御が可能。この場合 Hot だけど普通の Observable型。
実際に Hot Observable を使う場合は、refCount() しとくのが無難かなー、と思いました。(購読者の unsubscribe を厳密に管理しておけば、という前提で)