RustのAsync/Await徹底解説:Tokio、Futures、非同期並行処理の仕組みと実践
RustのAsync/Awaitの仕組みをTokioランタイム、Futureトレイト、タスクスポーン、構造化された並行処理、実践的なパターンまで深掘り解説します。

Rustの非同期プログラミングは、他のプログラミング言語とは根本的に異なるアプローチを採用しています。JavaScriptやPythonでは、async/await構文を使用すると、ランタイムが自動的にタスクをスケジューリングしますが、Rustでは「ゼロコスト抽象化」の原則に基づき、Futureは明示的にポーリングされるまで何も実行しません。この設計により、メモリ安全性を保ちながら、C言語に匹敵するパフォーマンスを実現することができます。
本記事では、Rustの非同期エコシステムの中心であるTokioランタイムを使用して、Futureの仕組みから実践的な並行処理パターンまでを体系的に解説します。
Rustのasync fnは呼び出し時点では何も実行せず、Futureを返すだけです。実際の処理は.awaitまたはランタイムによるポーリング時に初めて開始されます。これにより、不要な計算を避け、リソースを効率的に使用できます。
Rust Futuresと他言語の違い
Rustの非同期システムの核心はFutureトレイトです。このトレイトは、将来的に値を生成する可能性のある計算を表現します。
// core::future::Future
trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}pollメソッドはPoll::Ready(value)またはPoll::Pendingを返します。Pendingの場合、Futureはまだ完了しておらず、後で再度ポーリングする必要があることを示します。重要なのは、Futureはポーリングされない限り進行しないという点です。
JavaScriptのPromiseは作成された瞬間から実行を開始しますが、RustのFutureは完全に受動的です。この設計は、キャンセレーションの実装を容易にし、メモリ使用量を最小限に抑えます。Futureをドロップするだけで、関連するすべてのリソースがクリーンアップされます。
Tokioランタイムのセットアップ
Tokioは、Rustで最も広く使用されている非同期ランタイムです。マルチスレッドタスクスケジューラ、非同期I/O、タイマー、同期プリミティブなど、本番環境で必要なすべての機能を提供します。
# Cargo.toml
[dependencies]
tokio = { version = "2", features = ["rt-multi-thread", "macros", "net", "time"] }rt-multi-threadフィーチャーは、ワークスティーリングスケジューラを有効にし、複数のCPUコアを活用できるようにします。macrosフィーチャーは#[tokio::main]および#[tokio::test]属性マクロを提供します。
基本的な非同期プログラムは以下のように記述します。
#[tokio::main]
async fn main() {
let result = fetch_data().await;
println!("Got: {result}");
}
async fn fetch_data() -> String {
// Simulate async I/O
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
String::from("data loaded")
}#[tokio::main]マクロは、main関数をTokioランタイム内で実行するように変換します。内部的には、ランタイムを構築し、提供された非同期ブロックを完了まで実行します。
タスクのスポーンと構造化された並行処理
tokio::spawnを使用すると、独立したタスクをランタイム上で並行実行できます。スポーンされたタスクは、スポーン元のタスクとは独立してスケジュールされ、異なるスレッドで実行される可能性があります。
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
// Spawn two independent tasks
let handle_a: JoinHandle<u32> = tokio::spawn(async {
expensive_computation("dataset_a").await
});
let handle_b: JoinHandle<u32> = tokio::spawn(async {
expensive_computation("dataset_b").await
});
// Await both results
let (result_a, result_b) = (
handle_a.await.expect("task A panicked"),
handle_b.await.expect("task B panicked"),
);
println!("Results: {result_a}, {result_b}");
}
async fn expensive_computation(name: &str) -> u32 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!("{name} done");
42
}tokio::spawnに渡すFutureはSend + 'staticを満たす必要があります。これは、タスクが異なるスレッドに移動される可能性があり、スポーン元よりも長く存続する可能性があるためです。ローカル参照をキャプチャする場合は、所有権を移動するか、Arcでラップする必要があります。
JoinHandleは、タスクの結果を取得するために使用します。awaitすることで、タスクが完了するまで待機し、結果またはJoinError(パニック時)を取得できます。
tokio::join!とtokio::select!による複数Futureの結合
複数の非同期操作を効率的に組み合わせるために、Tokioはjoin!とselect!マクロを提供します。
join!は、すべてのFutureを並行して実行し、すべてが完了するまで待機します。
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// All three run concurrently, total time ~200ms (not 600ms)
let (users, orders, inventory) = tokio::join!(
fetch_users(),
fetch_orders(),
fetch_inventory()
);
println!("Users: {}, Orders: {}, Stock: {}", users.len(), orders.len(), inventory);
}
async fn fetch_users() -> Vec<String> {
sleep(Duration::from_millis(200)).await;
vec!["Alice".into(), "Bob".into()]
}
async fn fetch_orders() -> Vec<String> {
sleep(Duration::from_millis(150)).await;
vec!["ORD-001".into()]
}
async fn fetch_inventory() -> u32 {
sleep(Duration::from_millis(100)).await;
84
}この例では、3つのフェッチ操作が並行して実行されるため、合計実行時間は最も遅い操作(200ms)とほぼ同じになります。
Rustの面接対策はできていますか?
インタラクティブなシミュレーター、flashcards、技術テストで練習しましょう。
select!は、複数のFutureの中で最初に完了したものを処理します。これは、タイムアウトの実装やレース条件の処理に有用です。
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
tokio::select! {
val = fetch_from_cache() => {
println!("Cache hit: {val}");
}
val = fetch_from_database() => {
println!("DB result: {val}");
}
}
}
async fn fetch_from_cache() -> String {
sleep(Duration::from_millis(5)).await;
"cached_value".into()
}
async fn fetch_from_database() -> String {
sleep(Duration::from_millis(50)).await;
"db_value".into()
}この場合、キャッシュからの取得が先に完了するため、データベースクエリは自動的にキャンセルされます。
join!はすべての結果が必要な場合(例:複数のAPIからデータを集約)に使用します。select!は最初の結果だけが必要な場合(例:タイムアウト、キャッシュフォールバック)に使用します。間違った選択はパフォーマンス低下やリソースリークの原因になります。
非同期Rustにおけるエラーハンドリング
非同期コードでも、Rustの標準的なエラーハンドリングパターンが適用されます。Result型と?演算子を使用して、エラーを伝播させることができます。
use std::io;
#[derive(Debug)]
enum AppError {
Network(reqwest::Error),
Parse(serde_json::Error),
Io(io::Error),
}
async fn load_config(url: &str) -> Result<Config, AppError> {
let response = reqwest::get(url)
.await
.map_err(AppError::Network)?;
let text = response.text()
.await
.map_err(AppError::Network)?;
let config: Config = serde_json::from_str(&text)
.map_err(AppError::Parse)?;
Ok(config)
}
#[derive(serde::Deserialize)]
struct Config {
db_url: String,
port: u16,
}非同期関数内での?演算子の動作は同期コードと同じですが、.awaitの後にチェーンする点に注意が必要です。エラー型の変換にはmap_errを使用するか、thiserrorクレートでFrom実装を自動生成することもできます。
タスク間の非同期通信チャネル
複数のタスク間でデータを安全にやり取りするために、Tokioはチャネルを提供します。mpsc(multiple producer, single consumer)チャネルは最も一般的なパターンです。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Bounded channel with capacity 32
let (tx, mut rx) = mpsc::channel::<String>(32);
// Producer task
let producer = tokio::spawn(async move {
for i in 0..5 {
tx.send(format!("message-{i}")).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
// tx dropped here, closing the channel
});
// Consumer reads until channel closes
while let Some(msg) = rx.recv().await {
println!("Received: {msg}");
}
producer.await.unwrap();
}バウンドチャネルは、プロデューサーがコンシューマーを圧倒するのを防ぐバックプレッシャーメカニズムを提供します。キャパシティに達すると、sendは空きができるまで待機します。
実践パターン:レート制限付きHTTPリクエスト
実際のアプリケーションでは、同時実行数を制限する必要があることがよくあります。Semaphoreを使用すると、並行タスク数を制御できます。
use std::sync::Arc;
use tokio::sync::Semaphore;
async fn fetch_all(urls: Vec<String>, max_concurrent: usize) -> Vec<Result<String, String>> {
let semaphore = Arc::new(Semaphore::new(max_concurrent));
let mut handles = Vec::new();
for url in urls {
let sem = Arc::clone(&semaphore);
let handle = tokio::spawn(async move {
// Acquire permit before making request
let _permit = sem.acquire().await.unwrap();
reqwest::get(&url)
.await
.map(|r| r.status().to_string())
.map_err(|e| e.to_string())
// permit dropped here, allowing next task to proceed
});
handles.push(handle);
}
let mut results = Vec::new();
for handle in handles {
results.push(handle.await.unwrap());
}
results
}このパターンは、外部APIのレート制限を尊重したり、システムリソースの枯渇を防いだりするのに役立ちます。セマフォの許可はRAIIガードとして機能し、スコープを抜けると自動的に解放されます。
PinとUnpin:非同期Rustに必要な理由
Future::pollのシグネチャに現れるPin<&mut Self>は、自己参照構造体を安全に扱うために必要です。async/awaitによって生成されるステートマシンは、.awaitポイント間で保持される参照を含む可能性があります。
Pinは、値がメモリ内で移動されないことを保証します。これにより、自己参照が無効になることを防ぎます。ほとんどの場合、開発者が直接Pinを扱う必要はありませんが、手動でFutureを実装する場合や、低レベルのライブラリを作成する場合には理解が必要です。
Unpinトレイトを実装する型は、Pinの制約を受けずに移動できます。プリミティブ型やほとんどの標準ライブラリ型はUnpinを実装しています。
パフォーマンス特性と非同期の使い所
非同期プログラミングは、すべての状況で最適な選択ではありません。CPU集約型のタスクには、spawn_blockingを使用して専用のスレッドプールで実行します。
#[tokio::main]
async fn main() {
let hash = tokio::task::spawn_blocking(|| {
// CPU-intensive work runs on a blocking thread
compute_hash(b"large dataset")
})
.await
.unwrap();
println!("Hash: {hash}");
}
fn compute_hash(data: &[u8]) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
data.hash(&mut hasher);
format!("{:x}", hasher.finish())
}非同期ランタイムのワーカースレッドで長時間実行されるCPUバウンドの処理を行うと、他のタスクがブロックされ、全体的なスループットが低下します。spawn_blockingは、このような処理を別のスレッドプールにオフロードし、非同期ワーカーを解放します。
まとめ
Rustの非同期プログラミングは、学習曲線がありますが、正しく使用すれば優れたパフォーマンスとリソース効率を実現できます。以下のポイントを押さえておくことが重要です。
- Futureは遅延評価される:
.awaitまたはランタイムによるポーリングがなければ、処理は開始されません - Tokioが標準的な選択: マルチスレッドランタイム、タイマー、I/O、同期プリミティブを包括的に提供します
spawnには制約がある: スポーンされるFutureはSend + 'staticを満たす必要がありますjoin!とselect!を使い分ける: 全結果が必要か、最初の結果だけが必要かで選択します- チャネルでタスク間通信:
mpscチャネルはバックプレッシャーを提供し、安全なデータ共有を可能にします - セマフォで並行数を制限: 外部APIのレート制限やリソース保護に有用です
- CPU集約型タスクは
spawn_blockingで: 非同期ワーカーをブロックしないようにします
これらの概念を組み合わせることで、高スループットで信頼性の高い非同期アプリケーションを構築できます。
今すぐ練習を始めましょう!
面接シミュレーターと技術テストで知識をテストしましょう。
タグ
共有


