RxJS_非同期データストリームの扱い例 #386
Angularで開発をしていて、RxJS(Reactive Extensions for JavaScript)というライブラリが提供する非同期データストリームの処理に触れました。今回はその例を示しながら整理したいと思います。
RxJSはAngularと非常によく統合されており、Angularの公式ドキュメントでも頻繁に参照されています。特に、AngularのHTTPクライアントやフォームコントロール、ルーティングなど、多くの機能がRxJSのObservableとともに動作するように設計されています。
以下は15秒ごとにGETリクエストを送信し、myappDataを最新化してセットするプログラムです。
public subscription = new Subscription();
this.subscription.add(
interval(15000)
.pipe(
startWith(-1),
flatMap(x => this.myappService.getMyappData(x >= 0))
)
.subscribe(myappData => {
this.myappData = myappData;
this.setData();
})
);
前提として、this.myappService.getMyappData(x >= 0)でGETリクエストを投げています。
1つずつ解説していきます。
Subscription
冒頭のSubscription()は、RxJSのObservableからデータを受け取るためのものです。Observableからデータを受け取るには、そのObservableに対して「購読」(subscribe)する必要があり、それがSubscriptionです。
Observableは後述するintervalやstartWithで作成され、.subscribeで購読を行うと、Observableはデータを発行し始めます。
サンプルコード上で最終的に発行されるのは、getMyappData(x >= 0)で投げたGETリクエストの結果です。発行されたデータを.subscribe()で受け取り、ここではthis.myappDataの値を更新してセットしています。
また、これらの一連の処理をsubscriptionというSubscriptionオブジェクトに追加していますが、これは後で処理を停止するためです。例えばユーザがページを離れる時などに、subscription.unsubscribe()を呼び出すことで、15秒ごとの更新を停止できます。
Subscriptionのaddメソッド
上記の通り、このプログラムではsubscriptionという名前でSubscriptionオブジェクトを保持しています。
このオブジェクトにはaddメソッドで複数の購読を追加でき、その後一度に全ての購読を止めることができます。コンポーネントが破棄される際など、メモリリークを防ぐために全ての購読を止めるのが一般的です。
interval()
ミリ秒単位で数字を発行するObservableを作成します。発行する数値は0から始まり、1ずつ増えていきます(0, 1, 2, 3, ...)。ここではinterval(15000)となっているので、15秒毎に数字が発行されます。
以下のイメージです。
開始直後: 数値を発行しない
15秒後: 0を発行
30秒後: 1を発行
45秒後: 2を発行
60秒後: 3を発行
・・・以下同様に続く
pipe()
pipeはObservableに対して一連のオペレーター(関数)を順次適用するためのメソッドです。それぞれのオペレーターの出力は次のオペレーターの入力となります。
pipe()メソッドの役割はオペレータにObservableを渡すことです。
startWith()
startWithは最初のオペレーターとして用いられ、指定された値(この場合は-1)をすぐに発行する新しいObservableを作成します。
サンプルコードにおいて最初にObservableを作成するのはinterval()です。そのObservableは15秒ごとに値を発行しますが、開始直後は何も発行しません。
interval()で作成した、そのObservableに対してpipeを使用し、その中でstartWith(-1)を使用すると、startWithは新しいObservableを作成します。この新しいObservableは、元のObservableが発行する値の前に指定した値(ここでは-1)を発行します。
つまり、startWithは既存のObservableに対して動作し、そのObservableが発行する値のシーケンスに追加の値を挿入した新しいObservableを生成します。
flatMap()
Observableが発行する各値に対して関数を適用し、その結果をフラットな(ネストされていない)Observableに結合します。つまり関数の処理結果が新しいObservableとして発行されます。
ここではintervalとstartWithで発行された数値(最初は-1、その後は0、1、2...)を取り、getMyappData(x >= 0)に渡してGETリクエストを送信し、その結果を新たなObservableとして作成します。
そのObservableが最初に説明した.subscribeで購読され、最終的な処理が実施されている形です。
結果として、15秒ごとにmyappDataの取得処理が実行されます。
なお、getMyappData(x >= 0)の中でx >= 0のチェックを行っているのは、最初の-1が関数に渡されて実行した時と、その後の15秒間隔で実行した時とで処理を分けるためです。
これらの結果、このコードは開始直後に一度、そしてその後15秒ごとにgetMyappData関数を呼び出し、得られたキャンペーンのデータをthis.myappDataに設定し、setData()を呼び出すという動作をします。
↓再掲
public subscription = new Subscription();
// pollingしてstatus更新する
// リクエスト毎に-1からカウントアップされる引数を使って、初回表示時のリクエストとstatus更新用のリクエストを区別する
this.subscription.add(
interval(15000)
.pipe(
startWith(-1),
flatMap(x => this.myappService.getMyappData(x >= 0))
)
.subscribe(myappData => {
this.myappData = myappData;
this.setData();
})
);
ここまでお読みいただきありがとうございました!
この記事が気に入ったらサポートをしてみませんか?