これは RxJava Advent Calendar 2015 17日目 の記事です。

空いてたので参加してみました。
普段は Xamarin(C#) + Reactive Extensions + ReactiveProperty で、Reactive + MVVM な Android/iOS両対応アプリを開発しています。

Cold vs Hot

Cold Observable は「あなただけの」Stream、Hot は「みんなの」Stream 。
(私的にはニコ動かニコ生か、みたいに理解してますが、その話はいいや)

Cold は、あなたが subscribe した瞬間からデータが流れ始めます。
Hot は、あなたが subscribe してもデータは流れ始めません(流れるかも知れません?)。

では Hot Observable はいつからデータが流れ始める?Observable が生成された瞬間から?
その答え(の一つ)が ConnectableObservable

ConnectableObservable のデータ放流の開始と停止

Cold Observable を Hot化する publish メソッドの返り値は ConnectableObservable。
Hot は必ず ConnectableObservable。(←これ後で否定します)

ConnectableObservable には connect メソッドがあります。
Hot Observable のデータが流れ始めるのは、このメソッドを呼んだ瞬間から。
なので、どれだけ subscriber が居ようとも connect を呼ばなければデータは流れません。逆に subscriber が居なくても connect を呼べばデータが流れ始めます。

connect メソッドの返り値は Subscription です。
Subscription の unsubscribe メソッドを呼ぶと、データの放流が停止します。これも subscriber が居ようが居まいが停止します。
再度 connect すると、 最初から データが流れ出します。再開ではありません。

実例

Cold Observable

Observable.interval は、一定時間置きにインクリメントされた値を流す Cold Observable。
なので、複数の subscriber が居たら、各々に独立した値を流します。

Android の画面にボタンが2つ(buttonSubscribe1 と buttonSubscribe2)並んでるだけのサンプルです。

final Observable<Long> tickObservable = Observable.interval(1000, TimeUnit.MILLISECONDS);

// 可視性向上の為のなんちゃってラムダ
findViewById(R.id.buttonSubscribe1).setOnClickListener(v -> { 
    Log.d(TAG, "buttonSubscribe1 click!");
    tickObservable.subscribe(x -> Log.d(TAG, "subscriber1 - onNext - " + x));
});

findViewById(R.id.buttonSubscribe2).setOnClickListener(v -> {
    Log.d(TAG, "buttonSubscribe2 click!");
    tickObservable.subscribe(x -> Log.d(TAG, "subscriber2 - onNext - " + x));
});

結果:
D/MainActivity: buttonSubscribe1 click!
D/MainActivity: subscriber1 - onNext - 0
D/MainActivity: subscriber1 - onNext - 1
D/MainActivity: subscriber1 - onNext - 2
D/MainActivity: subscriber1 - onNext - 3
D/MainActivity: subscriber1 - onNext - 4
D/MainActivity: subscriber1 - onNext - 5
D/MainActivity: buttonSubscribe2 click!
D/MainActivity: subscriber1 - onNext - 6
D/MainActivity: subscriber2 - onNext - 0
D/MainActivity: subscriber1 - onNext - 7
D/MainActivity: subscriber2 - onNext - 1
D/MainActivity: subscriber1 - onNext - 8
D/MainActivity: subscriber2 - onNext - 2

buttonSubscribe1 を押すとデータ(0から連番)が流れ始めます。
しばらくして buttonSubscribe2 を押すと、1 とは関係なく、また 0 から流れ始めます。

Hot(Connectable) Observable

publish で Hot 化します。
connect と unsubscribe を呼ぶためのボタン(buttonConnect, buttonDisConnect)を画面に追加してます。

private Subscription _connection; // field です


final ConnectableObservable<Long> tickObservable = 
    Observable.interval(1000, TimeUnit.MILLISECONDS).publish(); // publish で Hot化

findViewById(R.id.buttonSubscribe1).setOnClickListener(v -> { 
    Log.d(TAG, "buttonSubscribe1 click!");
    tickObservable.subscribe(x -> Log.d(TAG, "subscriber1 - onNext - " + x));
});

findViewById(R.id.buttonSubscribe2).setOnClickListener(v -> {
    Log.d(TAG, "buttonSubscribe2 click!");
    tickObservable.subscribe(x -> Log.d(TAG, "subscriber2 - onNext - " + x));
});

findViewById(R.id.buttonConnect).setOnClickListener(v -> {
    Log.d(TAG, "buttonConnect click!");
    _connection = tickObservable.connect(); // データ放流開始
});

findViewById(R.id.buttonDisConnect).setOnClickListener(v -> {
    Log.d(TAG, "buttonDisConnect click!");
    if (_connection != null) {
        _connection.unsubscribe(); // データ放流停止
        _connection = null;
    }
});

結果:
D/MainActivity: buttonSubscribe1 click!
D/MainActivity: buttonConnect click! // ←数秒経過している
D/MainActivity: subscriber1 - onNext - 0
D/MainActivity: subscriber1 - onNext - 1
D/MainActivity: subscriber1 - onNext - 2
D/MainActivity: subscriber1 - onNext - 3
D/MainActivity: buttonSubscribe2 click!
D/MainActivity: subscriber1 - onNext - 4
D/MainActivity: subscriber2 - onNext - 4
D/MainActivity: subscriber1 - onNext - 5
D/MainActivity: subscriber2 - onNext - 5
D/MainActivity: subscriber1 - onNext - 6
D/MainActivity: subscriber2 - onNext - 6
D/MainActivity: buttonDisConnect click!
-これ以降 onNext は出力されない-

buttonSubscribe1 を押しても、まだデータは流れてきません。
数秒後、buttonConnect を押すとデータが流れ始めます。
buttonSubscribe2 を押すと、subscriber2 が増えますが、Hot(みんなの)Observable なので、流れてくる値とタイミングは subscriber1 と全く同じです。

buttonDisConnect を押すと、データの放流が停止されます。(ちなみにもう一度 CONNECT すると、また 0 から値が流れます)
subscriber1, subscriber2 にはもう onNext は呼ばれません。

※サンプルでは onNext しか受信していませんが、 buttonDisConnect を押しても、 subscriber1, subscriber2 の onComplete や onError も呼ばれません。つまり、 「データの放流が停止されても、 subscriber はそれに気付けない」 ということになります。これはこれでいいんだろか、という感じです。

ConnectableObservable.refCount について

Hot は必ず ConnectableObservable。(←これ後で否定します)

否定始めます。

ConnectableObservable では、データ放流の開始と停止は、 connect と unsubscribe に委ねられていました。

refCount() を使うとそれを自動化できます。(refCount?参照カウントを返すメソッド?そう思っていましたが全然違いました。)
どういうことかと言うと、最初の subscriber が現れたらデータ放流を開始し、誰も subscriber が居なくなったら放流を停止する、というものです。
refCount() の返値はただの Observable です、でも Hot です。はい否定しましたー。

実例

publish した Hot Observable を refCount してデータ放流を自動制御してもらいます。
画面には、 buttonConnect, buttonDisConnect に代わり、buttonUnsubscribe1, buttonUnsubscribe2 を用意します。

private Subscription _subscription1; // field です
private Subscription _subscription2; // field です
private Subscription _connection;    // field です


final Observable<Long> tickObservable = 
    Observable.interval(1000, TimeUnit.MILLISECONDS).publish().refCount(); // 返値は Connectable ではない

findViewById(R.id.buttonSubscribe1).setOnClickListener(v -> { 
    Log.d(TAG, "buttonSubscribe1 click!");
    _subscription1 = tickObservable.subscribe(x -> Log.d(TAG, "subscriber1 - onNext - " + x));
});

findViewById(R.id.buttonSubscribe2).setOnClickListener(v -> {
    Log.d(TAG, "buttonSubscribe2 click!");
    _subscription2 = tickObservable.subscribe(x -> Log.d(TAG, "subscriber2 - onNext - " + x));
});

findViewById(R.id.buttonUnsubscribe1).setOnClickListener(v -> {
    Log.d(TAG, "buttonUnsubscribe1 click!");
    if (_subscription1 != null) {
        _subscription1.unsubscribe(); // 1購読終了
        _subscription1 = null;
    }
});

findViewById(R.id.buttonUnsubscribe2).setOnClickListener(v -> {
    Log.d(TAG, "buttonUnsubscribe2 click!");
    if (_subscription2 != null) {
        _subscription2.unsubscribe(); // 2購読終了
        _subscription2 = null;
    }
});

結果:
D/MainActivity: buttonSubscribe1 click!
D/MainActivity: subscriber1 - onNext - 0
D/MainActivity: subscriber1 - onNext - 1
D/MainActivity: subscriber1 - onNext - 2
D/MainActivity: buttonSubscribe2 click!
D/MainActivity: subscriber1 - onNext - 3
D/MainActivity: subscriber2 - onNext - 3
D/MainActivity: subscriber1 - onNext - 4
D/MainActivity: subscriber2 - onNext - 4
D/MainActivity: subscriber1 - onNext - 5
D/MainActivity: subscriber2 - onNext - 5
D/MainActivity: subscriber1 - onNext - 6
D/MainActivity: subscriber2 - onNext - 6
D/MainActivity: subscriber1 - onNext - 7
D/MainActivity: subscriber2 - onNext - 7
D/MainActivity: buttonUnsubscribe1 click!
D/MainActivity: subscriber2 - onNext - 8
D/MainActivity: subscriber2 - onNext - 9
D/MainActivity: subscriber2 - onNext - 10
D/MainActivity: subscriber2 - onNext - 11
D/MainActivity: buttonUnsubscribe2 click!
-これ以降 onNext は出力されない-

buttonSubscribe1 を押すと、その時点でデータが流れ始めます(refCount による自動制御)。
buttonSubscribe2 を押すと、subscriber1 と同じタイミングで、同じ値を受信できます(Hot だから)。
buttonUnsubscribe1 を押すと、 subscriber1 は購読をやめますが、subscriber2 はまだ受信しています。
buttonUnsubscribe2 を押すと、subscriber2 も購読をやめ、この時点でデータ放流が停止します(refCount による自動制御)。

※ほんとにデータ放流終わってんの?を確認するには、 tickObservable に doOnNext を繋げて確認するとよいと思います。

まとめ

Hot Observable は、ほとんどの場合(publish により生成されるので) ConnectableObservable。
ConnectableObservable は、購読者の有無に関係なく connect でデータ放流開始、Subscription.unsubscribe でデータ放流停止。
refCount により購読者の有無に連動したデータ放流の自動制御が可能。この場合 Hot だけど普通の Observable型。

実際に Hot Observable を使う場合は、refCount() しとくのが無難かなー、と思いました。(購読者の unsubscribe を厳密に管理しておけば、という前提で)

参考

RxJava
Android
Java
ReactiveX

published

Ads

Read more!

amay077

Microsoft MVP(Xamarin). フルリモートワーカー. Geospatial Mobile app developer. Love C#.

amay077 amay077