RxJava のスケジューラの中に TrampolineScheduler
というのがあり、なんじゃこれ?とつぶやいたところ、 Rx.NET の CurrentThreadScheduler
と同じっぽい と教えてもらいました。
その流れで、類似の Scheduler である ImmediateScheduler との違いについて語られているトピックを紹介してもらいました。
@amay077 この辺読みとくと良いと思います(tranpolineというキーワードも登場します) https://t.co/A5TzOiobsC
— Atsushi Eno (@atsushieno) 2016, 1月 30
RxJava では CurrentThreadScheduler
を TrampolineScheduler
Hi ImmediateScheduler's Schedule method is pretty straightforward - it simply invokes the action.
In contrast, CurrentThreadScheduler seems more involved - it creates something called a trampoline, which in turn iterates over an action queue, sleeping between invocations of items in the queue and so forth
対照的に、 CurrentThreadScheduler
I've been trying to follow the code with reflector but I'm having a hard time understanding the difference. As far as I can tell, CurrentThreadSchedule's schedule method calls Trampoline 's Run method, which will end up blocking the current thread until the queued action is performed (on the current thread as well) - apparently just like in the case of ImmediateScheduler
私はコードを追ってみましたが、理解するのに苦労しています。分かる範囲では、 CurrentThreadSchedule.Schedule
メソッドは Trampoline.Run
メソッドを呼び出しています。これは現在のスレッドを、キューのアクションが実行されるまで(カレントスレッドも同様に)ブロックしようとします。 - どうも ImmediateScheduler
I realize I'm missing something, so an explanation would be really appreciated Thanks !
EDIT - In the meantime I've found a couple of resources that may shed light on the subject, if anyone's interested:
The trampoline seems to serve three purposes:
やあ、 トランポリンは3つの目的を持っているように見えます。
1- Prevents dead-locks from scheduler reentrancy.
1- スケジューラーの割り込みからデッドロックを防ぎます。
2- Prevents infinite loops in observables that require recursion through scheduler reentrancy.
2- スケジューラーの割り込みを使った再帰が必要な Observable の無限ループを防ぎます。
3- Cooperative single-threaded multitasking; I guess it's similar to the proposed async/await feature in C# 5.0. Calling CurrentThreadScheduler.Schedule is sort of like using await when the currently executing code was also scheduled via CurrentThreadScheduler.
3- シングルスレッドでの「協調的マルチタスキング」; 私は C# 5.0 に提案されている async/await に近いものだと思います。CurrentThreadScheduler.Schedule
の呼び出しは、現在実行中のコードも CurrentThreadScheduler
でスケジュールされていたときに await を使用するようなものです。(訳注: C# の async/await は協調的マルチタスキングではないと思います。これは async/await 登場以前に予想で書かれたものかと。
In the observable world, calling Subscribe should be an asynchronous operation. There's a problem if the scheduling of an observable dead-locks or blocks the current thread indefinitely because it attempts to execute immediately and never completes.
Observable の世界では、Subscribe
の呼び出しは、非同期処理で行わなければなりません。Observable のスケジューリングがデッドロックまたはカレントスレッドを無期限にブロックする場合、すぐに実行しようとしても完了しないので、問題になります。
Ignore the type of scheduler for a moment and consider a scheduled action that eventually, through some sequence of method calls, uses the same scheduler to schedule another action.
With the ImmediateScheduler, the inner action is executed immediately.
では、”内側のアクション” はすぐに実行されます。
- If the outer action acquires some resource on which the inner action depends, and the inner action cannot acquire this resource until it's released by the outer action, then these actions dead-lock.
- If the outer action depends upon the inner action, and the inner action depends upon the outer action, then this could result in an infinite loop that never yields control to other actions.
For example: Observable.Return(1).Repeat().Take(1)
例: Observable.Return(1).Repeat().Take(1)
By default, Return uses the ImmediateScheduler to call OnNext(1) then OnCompleted(). Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return. Because there's no trampoline in Return, this pattern repeats itself, blocking the current thread indefinitely. Calling Subscribe on this observable never returns. See this discussion for more information.
は ImmediateScheduler
を使って OnNext(1)
そして OnCompleted()
を呼び出します。 Repeat
はどんな並列性も使用しません、なのですぐに OnCompleted
を検知して、すぐに Return
を再購読します。なぜなら、 Return
にはトランポリンがないので、このパターンは自分自身を繰り返し、無期限に現在のスレッドをブロックし続けます。この Observable を Subscribe
With the CurrentThreadScheduler, the inner action is scheduled (queued) for execution when the outer action ends. Conceptually, inner actions are bounced on the trampoline until the current thread is ready to execute them.
- If the outer action acquires some resource on which the inner action depends, and the inner action cannot acquire this resource until it's released by the outer action, then these actions do not dead-lock because the inner action is not executed until the outer action completes.
- If the outer action recurses when the inner action completes, then there won't be an immediately infinite loop because the inner action does not complete until the outer action completes first.
For example: Observable.Return(1, Scheduler.CurrentThread).Repeat().Take(1)
例: Observable.Return(1, Scheduler.CurrentThread).Repeat().Take(1)
Here, Return is using the CurrentTheadScheduler to call OnNext(1) then OnCompleted(). Repeat does not introduce any concurrency, so it sees OnCompleted immediately and then immediately resubscribes to Return; however, this second subscription to Return schedules its (inner) actions on the trampoline because it's still executing on the OnCompleted callback from the first scheduled (outer) action, thus the repetition does not occur immediately. This allows Repeat to return a disposable to Take, which eventually calls OnCompleted, cancels the repetition by disposing Repeat, and ultimately the call from Subscribe returns.
ここでは、 Return
は CurrentTheadScheduler
を使って OnNext(1)
そして OnCompleted()
を呼び出します。 Repeat
はどんな並列性も使用しません、なのですぐに OnCompleted
を検知して、すぐに Return
を再購読します。しかし、この2回目の Return
の購読(内側のアクション)はトランポリンの上にあります、なぜなら、最初にスケジュールされたアクション(外側のアクション)の OnCompleted
は Take
に disposable(subscription) を返すことができます、それはやがて OnCompleted
の返値から Repeat
Keep in mind that the examples with Return and Repeat do not introduce any concurrency. When you call Subscribe, it will not return until the observable completes regardless of which of these schedulers you choose. With the ImmediateScheduler, Take calls OnCompleted but it cannot cancel the repetition, so Subscribe blocks indefinitely. Alternatively, the CurrentThreadScheduler allows for cooperative single-threaded multitasking between the Return and Repeat operators, thus allowing Take to cancel the repetition without having to introduce any concurrency.
覚えておいて欲しいのは、 Return
→ Repeat
を呼び出すと、あなたが選択した Scheduler に関係なく、Observable が終了するまで処理を返しません。 ImmediateScheduler
は OnCompleted
を呼び出しますが、繰り返しをキャンセルできません。なので Subscribe
は無期限にブロックしてしまいます。代わりに CurrentThreadScheduler
は Return
と Repeat
が処理をただ単に(割り込んで)実行するだけであるのに対し、 CurrentScheduler
) は、擬似的なマルチタスクを行う(懐かしの VB の DoEvents
Dave 氏の回答にあった例
で、 RxJava でも同じだよねえと、
// just が ImmediateScheduler 使うのか不安だったから subscribeOn しているよ
Log.debug(TAG, "Hoge");
と書いて実行してみたら、処理が帰ってくる! Log.debug
なんだかモヤモヤした終わりかたですが、今回は ImmediateScheduler
In the observable world, calling Subscribe should be an asynchronous operation.
との言葉通り、Rx.NET/RxJava を使うときは非同期にしたい事が全てだと思いますが、オペレータによっては既定で ImmediateScheduler
を使うものもあるので、必ず subscribeOn/observeOn