.NET の Task<T> は、Reactive Extensions が提供する拡張メソッド ToObservable()IObservable<T> に変換できます。

なにも考えずに ToObservable() を連発していたら、盛大にハマったのでメモ。

Task.Run().ToObservable() とか、意味ないっしょ

ダメなコード。

var i = 0;
IObservable<int> incrementObservable = Task.Run (() => {
	i++;
	Debug.WriteLine($"increment! - {i}");
	return i;
})
.ToObservable ();

Debug.WriteLine("Ready...");

incrementObservable // インクリメント
	.Repeat(3) // 3回繰り返す
	.Subscribe(
		x  => Debug.WriteLine($"OnNext({x})"),
		ex => Debug.WriteLine($"OnError({ex.ToString()})"),
		() => Debug.WriteLine("OnCompleted"));

incrementObservable は、副作用ありありですが、外部変数 i を +1 して後続に流す IObservable<int> です。
 これを .Repeat(3) して .Subscribe してますから、
 

Ready…
increment! - 1
OnNext(1)
increment! - 2
OnNext(2)
increment! - 3
OnNext(3)
OnCompleted

という出力を期待してました。
が、実際の出力はこう。

increment! - 1
Ready…
OnNext(1)
OnNext(1)
OnNext(1)
OnCompleted

Subscribe する前に Task が実行されてるし、 repeat してるのに increment されない。。。

「・・・ん? Task.Run().ToObservable() って、タスクを実行した結果を IObservable 化してるだけじゃね?」

コード見たまんまなんですが、これに気づくのに1時間かかりました。。。

期待通り動くのはこう↓。

var i = 0;
IObservable<int> incrementObservable = Observable.FromAsync(()=>Task.Run(() => {
	i++;
	Debug.WriteLine($"increment! - {i}");
	return i;
}));

Debug.WriteLine("Ready...");

incrementObservable // インクリメント
	.Repeat(3) // 3回繰り返す
	.Subscribe(
		x  => Debug.WriteLine($"OnNext({x})"),
		ex => Debug.WriteLine($"OnError({ex.ToString()})"),
		() => Debug.WriteLine("OnCompleted"));

Observable.FromAsync で Task の実行そのものを IObservable 化します。
これの結果は正しくこう↓なりました。

Ready…
increment! - 1
OnNext(1)
increment! - 2
OnNext(2)
increment! - 3
OnNext(3)
OnCompleted

Task は1回しか実行できない

ところで、 Task<T> は一度実行すると、2度目は実行できません。(Furure や Promise もそうだっけ)

var i = 0;
Task<int> incrementTask = new Task<int>(() => {
	i++;
	Debug.WriteLine($"increment! - {i}");
	return i;
});

incrementTask.RunSynchronously();
incrementTask.RunSynchronously(); 

このコードは2回目の RunSynchronously() で例外がでます。

となると、 incrementTask.ToObservable() したとしても、期待通り動いてくれなさそうです。
(そもそも Task は Start などしないと実行されないので、Observable のチェインの中でいつ呼ぶの?)

というわけで、 Task.ToObservable() は、どういう時に使えばいいのかよくわかりませんでした。だれか教えて下さい。(汗)

ReactiveX
C#

published

Ads

Read more!

amay077

Microsoft MVP(Xamarin). フルリモートワーカー. Geospatial Mobile app developer. Love C#.

amay077 amay077