非同期処理の話❷
こんにちは、株式会社LabBaseでエンジニアをしているミズノです。ちょと時間が空いてしまいましたが、少し前に非同期処理を調べてました。今回はRustでの簡易実装と合わせて説明します。
Rustは非同期処理ランタイムをコミュニティに任せています。エンジニアは最適なランタイムを選ぶことができますし、自身でランタイムを構築することもできます。今回は現状スタンダード感のあるtokioを利用したいと思います。
非同期処理ふりかえり
非同期処理は内部的にはポーリングを行い、Future・Executor・Wakerという登場人物がいます。主にこの3つが通知しあって非同期処理を実現しているようです。
まず、Futureのpollメソッドが実行されます。これがポーリングです。効率よくポーリングをするために、対象処理が完了時に、内部からランタイムに通知します。
それがWakerの役目です。Wakerは処理が完了したことをランタイムに通知します。Wakerからの通知はExecutorに伝わり、pollを再度実行します。この時にpoll()の戻り値がPoll::Ready()を返します。これで処理が完了します。
上の流れを順番にまとめると以下のようになります。
ランタイム → Futureのpoll()実行
FutureはPoll::Pendingを返す。まだ処理が終わっていないことをランタイムに報告
Future内で処理が完了すると、wake()を実行。ランタイムに通知
ランタイムはExecutorにもう一度poll()を実行させる
FutureはPoll::Ready()を返す。
ランタイムは処理完了を理解する。
コードで試す
sleepを利用して擬似的な非同期処理を作ってみます。
struct MyTimer {
expired_time: Instant,
}
impl Future for MyTimer {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.expired_time {
Poll::Ready("MyTimer has completed.".to_string())
} else {
println!("Still Pending...");
let waker = cx.waker().clone(); // wakerはContextから取得できます。
let expired_time = self.expired_time;
// 擬似I/O、別スレッドでSleepさせる
std::thread::spawn(move || {
std::thread::sleep(4);
waker.wake(); // スリープ終わったので、Executorに通知
});
// ランタイムにはPendingを返しておく
Poll::Pending
}
}
}
main()から呼び出します。
#[tokio::main]
async fn main() {
let f = tokio::spawn(async {
let future = MyTimer {
expired_time: Instant::now() + Duration::from_secs(4),
};
let res = future.await; // 1度目のpoll()メソッドの実行
println!("{}", res);
});
let _ = tokio::join!(f);
}
登場人物が多いのですが、それぞれの役目は名前からも理解しやすいですね。RustではCrateによっては非同期処理の未対応もあるので、自身で追加できるスキルがあると困ることはないですし、良いコードを書くことにも役立ちます。
プログラミングRustの書籍では日本語でも詳しく説明されているのでおすすめです。Concurrency周りの知識をつけるにはRustは最適ですね。
余談
ソフトウェアの処理を大きく2つに分類すると、CPUに負荷のかかる処置 と、I/Oに負荷のかかる処理に分けて考えることができますが、上記のコードを考えると、非同期処理は待ち時間が発生するようなI/Oの方が向いてます。
Tokioは非同期処理の実行のために、スレッドをいくつ使うかなど、かなり柔軟に選択できるようになっている。その辺りはお任せできるのでありがたい。
弊社ではRustゴリゴリ描きたいエンジニアを絶賛募集しています。お気軽にカジュアル面談を申し込んでください。カジュアル面談は選考要素0ですので、気軽にご利用いただいて大丈夫です。お待ちしています!