rx.Observable<T>
のオペレータは、通常は非同期で、並列に処理されます。
例えば以下のような場合:
public void start() {
Observable.range(1, 5)
.flatMap(x -> fatTask(x))
.subscribe(x -> Log.d(TAG, "onNext - " + x));
}
private final Random rand = new Random();
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
// ランダムにスリープした後 x を onNext する
private Observable<Integer> fatTask(final int x) {
return Observable.create(subscriber -> {
long sleep = (long) (rand.nextDouble() * 10000L);
Log.d(TAG, "fatTask(" + x + ") - start.");
executor.schedule(() -> {
subscriber.onNext(x);
subscriber.onCompleted();
}, sleep, TimeUnit.MILLISECONDS);
});
}
このプログラムの出力はこうなります。
出力: fatTask(1) - start.
fatTask(2) - start. fatTask(3) - start. fatTask(4) - start. fatTask(5) - start. onNext - 3 onNext - 5 onNext - 4 onNext - 2 onNext - 1
fatTask は 1,2,3,4,5 の順で 完了を待たずに 呼びだされます。
が、それぞれ処理にかかる時間が異なるので、 onNext
が呼ばれる順は 1〜 とは限りません。
ソースとなる Stream の順番を崩したくない場合は、 fatTask(1)
が完了してから fatTask(2)
を開始する、というように直列化しなければなりません。
これを行うのが Observable.Concat
です(RxJava では Observable.concatWith
のようですね)。
複数の Observable
を順に(完了してから次へ)処理していきます。
toList
で一旦ただの List
にしてから、concatWith
で数珠つなぎにします。
public void start() {
Observable.range(1, 5)
.toList()
.flatMap(list -> {
// fatTask(1).contat(fatTask(2)).contat(fatTask(3))...
// にする(fold 使えれば…)
Observable<Integer> task = null;
for (int x : list) {
if (task == null) {
task = fatTask(x);
} else {
task = task.concatWith(fatTask(x));
}
}
return task;
})
.subscribe(x -> Log.d(TAG, "onNext - " + x));
}
このプログラムの出力はこうなります。
出力
fatTask(1) - start. onNext - 1 fatTask(2) - start. onNext - 2 fatTask(3) - start. onNext - 3 fatTask(4) - start. onNext - 4 fatTask(5) - start. onNext - 5
fatTask(1)
の完了を待ってから、次の fatTask(2)
が実行されています。
※ Rx.NET では、
static IObservable<T> Concat<T>(IEnumerable<IObservable<T>> sources)
で、複数の IObservable
を一括で渡せるのですが、 RxJava にはないようで、、、。
static <T> Observable<T> concatEager(Iterable<? extends Observable<? extends T>> sources)
というのがあったんですが、期待通りうごいてくれず、 Eager? なんでしょう?
toList
で一旦ただの List にしているのが非常に気に入らないですね。
range(1, 5)
が interval(1, TimeUnit.SECONDS)
のように無限の Stream だったら使えません。
そこで、 concat
には、こんな overload もあります。
static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables)
Observable
public void start() {
// 2. を concat する
Observable.concat(
// 1. Observable<Long>
Observable.interval(1, TimeUnit.SECONDS)
// 2. Long を Observable<Integer> に変換
// → Observable<Observable<Integer>> になる
.map(x -> fatTask(x.intValue())))
.subscribe(x -> Log.d(TAG, "onNext - " + x));
}
このプログラムの出力はこうなります。
出力
fatTask(0) - start. onNext - 0 fatTask(1) - start. onNext - 1 fatTask(2) - start. onNext - 2 …つづく
無限リストながら、並列処理せずに順序通り動いてくれます。
interval
の値を単純に map
で Observable<Integer>
に変換してやります。するとこれは Observable<Observable<Integer>>
になり、concat
可能になります。 flatMap
だと平坦化されちゃうのでただの map
です。
Observable は普通は非同期で並列処理。
非同期ながら直列化したい場合は Observable.concat
でできます。
みたいな処理をするとき 3. を 1. の順序と同じにしたいのでこれを使います。
はじめ自分は flatMap
で繋いでいくだけですべて直列化されているのかなーと勘違いしていたので、これを知った時は目からウロコでした。