Experiments Never Fail

Task→Observable 変換でハマったこと

.NET の Task<T> は、Reactive Extensions が提供する拡張メソッド ToObservable()IObservable<T> に変換できます。

なにも考えずに ToObservable() を連発していたら、盛大にハマったのでメモ。

Task.Run().ToObservable() とか、意味ないっしょ #

ダメなコード。

var i = 0;
IObservable<int> incrementObservable = Task.Run (() => {
i++;
Debug.WriteLine($"increment! - {i}");
return i;
})
.ToObservable ();

Debug.WriteLine("Ready...");

incrementObservable // インクリメント
.Repeat(3) // 3回繰り返す
.Subscribe(
x => Debug.WriteLine($"OnNext({x})"),
ex => Debug.WriteLine($"OnError({ex.ToString()})"),
() => Debug.WriteLine("OnCompleted"));

incrementObservable は、副作用ありありですが、外部変数 i を +1 して後続に流す IObservable<int> です。
 これを .Repeat(3) して .Subscribe してますから、

Ready...
increment! - 1
OnNext(1)
increment! - 2
OnNext(2)
increment! - 3
OnNext(3)
OnCompleted

という出力を期待してました。
が、実際の出力はこう。

increment! - 1
Ready...
OnNext(1)
OnNext(1)
OnNext(1)
OnCompleted

Subscribe する前に Task が実行されてるし、 repeat してるのに increment されない。。。

「・・・ん? Task.Run().ToObservable() って、タスクを実行した結果を IObservable 化してるだけじゃね?」

コード見たまんまなんですが、これに気づくのに1時間かかりました。。。

期待通り動くのはこう↓。

var i = 0;
IObservable<int> incrementObservable = Observable.FromAsync(()=>Task.Run(() => {
i++;
Debug.WriteLine($"increment! - {i}");
return i;
}));

Debug.WriteLine("Ready...");

incrementObservable // インクリメント
.Repeat(3) // 3回繰り返す
.Subscribe(
x => Debug.WriteLine($"OnNext({x})"),
ex => Debug.WriteLine($"OnError({ex.ToString()})"),
() => Debug.WriteLine("OnCompleted"));

Observable.FromAsync で Task の実行そのものを IObservable 化します。
これの結果は正しくこう↓なりました。

Ready...
increment! - 1
OnNext(1)
increment! - 2
OnNext(2)
increment! - 3
OnNext(3)
OnCompleted

Task は1回しか実行できない #

ところで、 Task<T> は一度実行すると、2度目は実行できません。(Furure や Promise もそうだっけ)

var i = 0;
Task<int> incrementTask = new Task<int>(() => {
i++;
Debug.WriteLine($"increment! - {i}");
return i;
});

incrementTask.RunSynchronously();
incrementTask.RunSynchronously();

このコードは2回目の RunSynchronously() で例外がでます。

となると、 incrementTask.ToObservable() したとしても、期待通り動いてくれなさそうです。
(そもそも Task は Start などしないと実行されないので、Observable のチェインの中でいつ呼ぶの?)

というわけで、 Task.ToObservable() は、どういう時に使えばいいのかよくわかりませんでした。だれか教えて下さい。(汗)

published at tags: ReactiveX C#