毎度同じ図を張り付けてますが、以下のような構成で CAN メッセージとユーザ入力を処理するサーバプログラムを書いているわけです。基本機能がそろってきて最低限の動作をするようになってきましたが、ソフトウェアの構成が気に入りません。そろそろ規模が 2000行に届きそうで、ぼちぼち最後のチャンスかもな?と、この二日ほど全面書き直しをしていました。この程度の規模なのにすでに二回目。いけませんね。一回目は C++ から Rust への移行でしたが、今回は Rust の中での書き直しです。まだ README も書いていないので見ても何が何だか、かもしれませんが、一応ソースコードはこちら
https://github.com/naokiiwakami/mission-control

書き直しの動機は二つ
- すでにソフトウェアの構成がごちゃごちゃになっていて、書けば書くほどひどくなってくる。設計の方針を間違ったようだ。構成を根本的に直したい
- 全体の動作はメッセージ駆動の非同期なのに同期関数系だけで組んでいる。そろそろ手に負えなくなってきた
ということで、考え直したソフトウェア構成を Tokio を使って組みなおしました。Tokio とは、非同期プログラミングを支援するためのライブラリみたいなもので、Rust で非同期関数を書くならなくてはならないものと思われます。なにしろ Rust 公式の Rust 本で非同期の章を見ると結局中では Tokio が使われているぐらいです。
まあそれはともかく、このサーバプログラムはメッセージ駆動という手法で書かれています。イベント駆動ともいいます。下の図がそうなんですけれども、

イベント駆動の基本的な仕組み
イベント駆動とはどういう仕組みかというと
- 何かしら事象が起きそうな場所にはプログラムが張り付いている。上の図では CAN バス(CAN メッセージ受信) とかネットワーク/イーサネット (ユーザコマンド受信) あたりがそうです。そしてこれらの「見張り役」はことが起きるまでは寝ています。
- 何か事象が起きると、例えば CAN メッセージを受信した場合、見張り役が起きてきてメッセージを取得します。そしてメッセージをリクエスト処理部に投げます。図では人の手のラインです。
- メッセージを受信して送ったことをリクエスト処理部に通知します。図では猫の手のラインです。
- リクエスト処理部は普段は寝ていますが、通知を受け取ると起きてきて、通知に従って「受け手」に届いている CAN メッセージを拾い上げて処理します。
こういう風にメモ用紙をやり取りする感じでメッセージを投げ合いながら情報を処理していきます。メッセージを一度送っているのに「猫の手」はいるのか?という問題もありますが、それはいつか別記事で。この仕組みは一見単純です。メッセージ一通を処理するだけでよいなら実際に単純です。なんでこの方式が良いのかに話が行くとすごく長くなるのでそれは置いておいて、この方式の欠点について書くと、処理がメッセージ一通で収まらないととても面倒になるという問題があります。
イベント駆動での処理の流れ
例えば、ミッションコントロールには ping という機能があります。これは、CAN ネットワーク上にいるモジュールが応答するかどうか確かめる機能で、こんな動作になります。
プログラム上では
- ユーザがミッションコントロールに ping 依頼を出す
- ミッションコントロールが CAN インタフェースに ping を送るように依頼
- CAN インタフェースは ping を CAN バスに流す
- ping を拾ったモジュールは ping back を CAN バスに流す
- CAN インタフェースは ping back を拾う。拾ったメッセージをミッションコントロールに渡して通知する
- ミッションコントロールは通知を受けて起きだしてユーザに ping の完了処理を行う→(ユーザに通知)
一見単純ですけれども、ミッションコントロールが行う「ping の完了処理」とは何か?これが実はやっかいです。ミッションコントロールが完了処理を行うには、以下のことを覚えておく必要があります
- 自分が ping を出したこと
- 誰に ping を出したか
- 完了したら何をすればよいか
この記憶があってはじめてミッションコントロールはユーザに完了通知が送れます。一方で戻ってくる Ping back の CAN メッセージには何が含まれているか?
- 誰が ping back を出したか (誰に ping を出したかと同じ)
これしかありません。じゃあ、自分が ping を出したことと応答が返ってきたら何をすれば良いかの記憶はどこに?ということで、どこかに覚えておく必要があります。この「どこに何を覚えさせる?」という問題がとても奥深くて、最初の版では同期関数を使ってこの動きを実装したのですが、同期関数は「待つ」と他の処理を止めてしまうため(例えば ping 応答を止まって待っていたら他の ping 依頼を受け付けられない)、待ちが必要になったらすべてを投げ出して速やかに処理を終了しないといけません。その時点で今までの処理に関する記憶は全て失うので、ミッションコントロールは ping が戻ってきても全く何も覚えていません。そういうわけで、ping が戻ってきたときのためにこんな感じのレシピを作ってなくさないように保存しておく必要があります。
- メモ:モジュール A さんへ送った ping について
- ping が戻ってきたら
- 待っているユーザがいるからそのユーザに
- 終わった旨通知しなさい
そしてこれをとある記憶領域にモジュール A に紐づけて格納しておきます。そして A からの ping 応答が返ってきたら、ミッションコントロールは何も覚えていませんけど、取り出してきたこのメモをもとにやるべき処理をするわけです。
ちゃんと動きそうですね。実際ちゃんと動きます。ですが、ここでソフトウェア構成上の問題が持ち上がります。ping 依頼は何もユーザからだけとは限りません。モジュールの状態を把握するためにミッションコントロールが自発的に ping をするかもしれません。その場合もミッションコントロールはしっかり何をしたか忘れてしまいます。この場合のメモはこんな風になってしまうはず
- メモ:モジュール A さんへ送った ping について
- ping が戻ってきたら
- 自分自身が持っているモジュールの状態リストに行って
- 最終生存確認時刻を更新しなさい
やることがぜーんぜん違いますね。これをソフトウェア上で同じ記憶領域を使って覚えさせるというのは大変に面倒です。そしてさらにやっかいなことに、ping は戻ってこないかもしれない。時間切れを設定する必要があります。でも同期関数ミッションコントロールは「待つ」ことができません。どうするか?時間切れ処理メモが必要です。こんな
- メモ:モジュール A さんへ送った ping についての特別処理
- このメモは ping が送られてから 10 秒後に自動的に開かれます
- このメモが開かれたら
- 自分自身が持っているモジュールの状態リストに行って
- モジュール A を死亡状態に変えなさい
- そして保存してある本来のメモを消去しなさい
- ところで、このメモが開かれた瞬間にも ping はもう返ってきているかもしれない。双方同時に消去して事故ったりしないようにね
いやーごちゃついてます。プログラムがどんどん複雑になる方向に行きそうです。
この辺の問題が見えてきたところで僕はもう書き直そうかな、と考え始めました。
非同期関数の出番
ここで設計の分かれ道に立つわけです。どうしてプログラミングが変な方向へ行くかというと、「同期関数は待つことができない」という縛りがあるからです。でも本当でしょうか?実はやりようもあるにはあります。スレッドを使えば同期関数であっても他の人を止めずに待つことができます。しかしスレッドとイベント駆動方式はかなり相性が悪いです。そちらの方へ行くと別の不幸が待っていそうです。ここで待つことができない同期関数の縛りから逃れる別の方法が見えてきます。非同期関数の出番です。非同期関数は名前からの印象とうらはらに「待つ」ことができます。細部の正確さには目をつぶって、こんな感じのプログラムになります
async fn ping(module_id: u8) {
let reply_receiver = send_ping(module_id);
reply_receiver.await; // 応答が返ってくるまで待っている
}
async fn user_ping(module_id: u8, user_reply_tx: Sender<String>) {
ping(module_id).await; // ping が完了するまで待っている
user_reply_tx.send("ok".to_string()).await.unwrap(); // ユーザに終わった旨通知
}
async fn auto_ping(&mut self, module_id: u8, modules_tx: Sender<Command>) {
ping(module_id).await; // ping が完了するまで待っている
modules_tx.send(Command {
action: Action::OkNotice,
module_id,
}).await.unwrap(); // モジュールリストに生存確認通知を送る
}
Rust途中でちょいちょい待ちが入りますが、止まって待っている間、実は他の処理をブロックしているのではなく、別の処理に CPU を譲っています。中での CPU の働き方は同期関数で書いた場合とそれほどは違っていません。ですがプログラムは圧倒的に単純になります。そしてこれが重要なのですが、以前には「起きてきたときにやることメモ」に書いてあった「ユーザに通知しなさい」や「最終生存確認時刻を更新しなさい」は、そのままここに書いてあります。やることメモをよそに記録しておくのと比べるとなんという楽さでしょう。ま、メモ自体はそれでもまだ必要なんですけど情報が多すぎるのでここでは省略。
さっきややこしいことになったタイムアウト処理はどうなるでしょうか?こんな感じに書けます。
async fn ping(module_id: u8) {
let reply_receiver = send_ping(module_id);
reply_receiver.await; // 応答が返ってくるまで待っている
}
async fn user_ping(module_id: u8, user_reply_tx: Sender<String>) {
let reply = match timeout(Duration::from_secs(10), ping(module_id)).await { // ping 10秒まで待つ
Ok(_) => "ok", // ユーザに終わった旨通知
Err(_) =>"timed out", // ユーザに時間切れを通知
};
user_reply_tx.send(reply.to_string()).await.unwrap();
}
async fn auto_ping(&mut self, module_id: u8, modules_tx: Sender<Command>) {
let command = match timeout(Duration::from_secs(10), ping(module_id)).await { // 10秒までね
Ok(_) => Command { // 生存確認通知
action: Action::OkNotice,
module_id,
},
Err(_) => Command { // 死亡届
action: Action::MarkDead,
module_id,
}
};
modules_tx.send(command).await.unwrap();
}
Rust拍子抜けするぐらい簡単です。まあ中ではさっき上げた特別メモに相当することをやってはいるんですけれども、自分でやらなくて良いのはありがたいです。
というわけで、ミッションコントロールは、非同期関数へ移行しました。tokio についても書きたかったけれども長くなりすぎたのでまた別の記事にできたらと思います。