Rustのスレッド間で変数を共有する ー 有識者レビュー (連載18)
前回書いたコードについて、レビューをして頂きました。
そのフィードバック内容について記載します。
また、フィードバッグ内容に基づいて、一つ実験をしてみましたので、そちらについても記載します。
1.スレッドに渡すデータをまとめる
Mutexとして定義し、各スレッドに渡すデータは以下のように二つあります。
let databuf = Arc::new(Mutex::new([0u8; 4096]));
let pos = Arc::new(Mutex::new(0));
ここは、一つにまとめるべきです。
構造体にまとめましょう。
struct AccelSampleBuffer {
bytes: [u8; 4096],
num_bytes: usize,
}
let state = Arc::new(Mutex::new(AccelSampleBuffer {bytes:[0u8; 4096], num_bytes:0}));
state構造体をMutex化しました。
次に、以下のようにクローンを作り、各スレッドに渡して処理をします。
・加速度センサ値を配列にためていくスレッド用
let state_in = Arc::clone(&state);
・TCP/IP送信スレッド用
let state_in = Arc::clone(&state);
これらに伴う変更箇所を細々と修正し、ビルドが通り、無事動作しました。
本件は別にRustに限った話ではなく、プログラムを作成する上での一般的な注意事項ですね。
2.lock()後にunwrap()することが多い理由
以下に記述があります。
Mutex in std::sync - Rust (rust-lang.org)
有識者の方に意訳して頂きました。
要は、
ロックされたまま如何ともしがたい状態になったMutexがあっても、ロック解除待ちで永遠に他スレッドが待たされずに済む方法。
さらに、Mutexを持ったままパニックを起こした場合にそのMutexを自動的に開放する、というアプローチも考えられるが、その場合にも「それは汚染している」というマークを付けることで警鐘を鳴らす。
という事ですかね。
Rustでは、Mutexもより安全に使えるよう、考えられているという事ですね。
3.別の共有方法を検討する
別の共有方法があると教えて頂きました。
全部試しているととても時間がかかりそうなので、今回はご紹介のみでご容赦ください。
3.1 グローバル変数 (static) に Mutex<_> を置く
Mutex<>は、それ自体を各スレッドに参照してもらうため、クローンが不要です。
このため、実行速度も速く使用メモリも少なくて済みます。
今回のプログラムに適用する場合、
static STATE: Mutex<AccelMeasureState> = Mutex::new(AccelMeasureState { ... });
のように定義して使うとよい、と助言頂きました。
3.2 アトミック変数 (std::sync::atomic::Atomic*) に状態を格納する
以下のように、構造体の各要素をアトミック化する方法です。
struct AccelSampleBuffer {
samples: [[AtomicU32; 3]; 4096],
num_samples: AtomicUsize,
}
let state = Arc::new(AccelSampleBuffer { ... });
let i = 0;
state.samples[i][0].store(axis_array[0].to_bits(), Ordering::Relaxed);
let x_axis = f32::from_bits(state.samples[i][0].load(Ordering::Relaxed));
3.3 チャンネルを使う
チャンネルを使ってバッファ制御をする手もあります。
1.SenderとRecieverを備えたMPSC(multiple producer, single consumer)チャネル
2.SPSC (single producer, single consumer)チャネルであるリングバッファを使う
今回のサンプルの場合、最初からリングバッファを使うのが一番正解だったような。。。
4.【実験】アサインしたバッファをオーバーフローした場合の動作
今回、加速度センサからの値を保持しておくバッファを4096バイト分アサインさいています。
これがオーバーフローした場合、どう動くのか見てみました。
Rustはメモリ安全でに設計されていますが、アーキテクチャまで保証しているわけではありません。それはプログラマが考える事です。
そのプログラマが失敗した場合、どのように動くのか?
4.1 オーバーフローさせてみる
加速度センサからの値をバッファに格納しているコードは以下です。
let mut state_in_1 = state_in.lock().unwrap();
if state_in_1.num_bytes>(4095-12){
state_in_1.num_bytes = 0;
}
let count = state_in_1.num_bytes;
for n in 0..4 {
state_in_1.bytes[count + n] = retbytes_x[n];
state_in_1.bytes[count + n + 4] = retbytes_y[n];
state_in_1.bytes[count + n + 8] = retbytes_z[n];
}
state_in_1.num_bytes+=12;
この中の以下の部分。
if state_in_1.num_bytes>(4095-12){
state_in_1.num_bytes = 0;
}
この部分が、バッファオーバーフローを防いでいる部分です。
これを以下のように変更し、わざとオーバーフローさせてみましょう。
if state_in_1.num_bytes>(4095){
state_in_1.num_bytes = 0;
}
実行してみました。
超えたからパニックになったよ、と表示されました。
CPUの例外に飛んで暴走したりはしませんでした。
ちなみに、SOLID-IDEの操作で、パニック時にブレークさせることが可能です。
シンボル名 rust_panic を指定してブレークポイントを設定すればOKです。
すなわち、配列のバッファオーバーランなどの実行時エラーについて、デバッグ時には rust_panic()にブレークポイント設定しておけば、それが発生したときに捕まえることができる、という事ですね。
やらかした!をデバッグするのが容易になりそうです。
さらに、ソースコード上で std::panic::catch_unwind 関数を使って、パニックのキャッチコードを書くことができます。ですのでパニックを捕捉し復帰を試みる等が可能です。
加えて、パニックを復帰させるより、システムをリセットしてしまいたい場合もあります。
この場合、パニックの捕捉を無効化し、常にアボートするようにするコンパイルオプションもあります。
4.2 他スレッドからアクセスしてみる
このバッファはArc<Mutex<_>>型で各スレッドに渡されています。
せっかくなので、この状態のままTCP/IP送信スレッドからアクセスする実験をしてみましょう。
PCアプリからTCP/IP送信してみます。
unwrap()でPoisonErrorが帰ってきました!
そして特にデッドロックはしていない様子です。
もう一度PCアプリからTCP/IP送信してみます。
同じエラーがもう一つ増えました。という事はデッドロックはしていないという事です。
今回はここまで。
今回まで作成したプログラムをzip化しました。
以下からダウンロードできます。
PC側C#プログラム(Visual Studio 2019系)
(bin\Releaseフォルダ内にexecutableファイルがあります)
Raspberry pi4があれば試してみることが可能です。
何かのお役に立てば幸いです。
#PC側プログラムは何のエラーチェックもしていないので、データが返信されてこない場合等例外が発生します。
次回は、SPI制御に戻り、割り込み処理の実装について考えていきたいと思います。