RustのAsync/Await徹底解説:Tokio、Futures、非同期並行処理の仕組みと実践

RustのAsync/Awaitの仕組みをTokioランタイム、Futureトレイト、タスクスポーン、構造化された並行処理、実践的なパターンまで深掘り解説します。

Rust async await concurrency with Tokio runtime and futures execution flow

Rustの非同期プログラミングは、他のプログラミング言語とは根本的に異なるアプローチを採用しています。JavaScriptやPythonでは、async/await構文を使用すると、ランタイムが自動的にタスクをスケジューリングしますが、Rustでは「ゼロコスト抽象化」の原則に基づき、Futureは明示的にポーリングされるまで何も実行しません。この設計により、メモリ安全性を保ちながら、C言語に匹敵するパフォーマンスを実現することができます。

本記事では、Rustの非同期エコシステムの中心であるTokioランタイムを使用して、Futureの仕組みから実践的な並行処理パターンまでを体系的に解説します。

Rustの非同期は遅延評価

Rustのasync fnは呼び出し時点では何も実行せず、Futureを返すだけです。実際の処理は.awaitまたはランタイムによるポーリング時に初めて開始されます。これにより、不要な計算を避け、リソースを効率的に使用できます。

Rust Futuresと他言語の違い

Rustの非同期システムの核心はFutureトレイトです。このトレイトは、将来的に値を生成する可能性のある計算を表現します。

rust
// core::future::Future
trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pollメソッドはPoll::Ready(value)またはPoll::Pendingを返します。Pendingの場合、Futureはまだ完了しておらず、後で再度ポーリングする必要があることを示します。重要なのは、Futureはポーリングされない限り進行しないという点です。

JavaScriptのPromiseは作成された瞬間から実行を開始しますが、RustのFutureは完全に受動的です。この設計は、キャンセレーションの実装を容易にし、メモリ使用量を最小限に抑えます。Futureをドロップするだけで、関連するすべてのリソースがクリーンアップされます。

Tokioランタイムのセットアップ

Tokioは、Rustで最も広く使用されている非同期ランタイムです。マルチスレッドタスクスケジューラ、非同期I/O、タイマー、同期プリミティブなど、本番環境で必要なすべての機能を提供します。

toml
# Cargo.toml
[dependencies]
tokio = { version = "2", features = ["rt-multi-thread", "macros", "net", "time"] }

rt-multi-threadフィーチャーは、ワークスティーリングスケジューラを有効にし、複数のCPUコアを活用できるようにします。macrosフィーチャーは#[tokio::main]および#[tokio::test]属性マクロを提供します。

基本的な非同期プログラムは以下のように記述します。

main.rsrust
#[tokio::main]
async fn main() {
    let result = fetch_data().await;
    println!("Got: {result}");
}

async fn fetch_data() -> String {
    // Simulate async I/O
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    String::from("data loaded")
}

#[tokio::main]マクロは、main関数をTokioランタイム内で実行するように変換します。内部的には、ランタイムを構築し、提供された非同期ブロックを完了まで実行します。

タスクのスポーンと構造化された並行処理

tokio::spawnを使用すると、独立したタスクをランタイム上で並行実行できます。スポーンされたタスクは、スポーン元のタスクとは独立してスケジュールされ、異なるスレッドで実行される可能性があります。

concurrent_tasks.rsrust
use tokio::task::JoinHandle;

#[tokio::main]
async fn main() {
    // Spawn two independent tasks
    let handle_a: JoinHandle<u32> = tokio::spawn(async {
        expensive_computation("dataset_a").await
    });

    let handle_b: JoinHandle<u32> = tokio::spawn(async {
        expensive_computation("dataset_b").await
    });

    // Await both results
    let (result_a, result_b) = (
        handle_a.await.expect("task A panicked"),
        handle_b.await.expect("task B panicked"),
    );

    println!("Results: {result_a}, {result_b}");
}

async fn expensive_computation(name: &str) -> u32 {
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    println!("{name} done");
    42
}
スポーンされたタスクの要件

tokio::spawnに渡すFutureはSend + 'staticを満たす必要があります。これは、タスクが異なるスレッドに移動される可能性があり、スポーン元よりも長く存続する可能性があるためです。ローカル参照をキャプチャする場合は、所有権を移動するか、Arcでラップする必要があります。

JoinHandleは、タスクの結果を取得するために使用します。awaitすることで、タスクが完了するまで待機し、結果またはJoinError(パニック時)を取得できます。

tokio::join!とtokio::select!による複数Futureの結合

複数の非同期操作を効率的に組み合わせるために、Tokioはjoin!select!マクロを提供します。

join!は、すべてのFutureを並行して実行し、すべてが完了するまで待機します。

join_example.rsrust
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // All three run concurrently, total time ~200ms (not 600ms)
    let (users, orders, inventory) = tokio::join!(
        fetch_users(),
        fetch_orders(),
        fetch_inventory()
    );

    println!("Users: {}, Orders: {}, Stock: {}", users.len(), orders.len(), inventory);
}

async fn fetch_users() -> Vec<String> {
    sleep(Duration::from_millis(200)).await;
    vec!["Alice".into(), "Bob".into()]
}

async fn fetch_orders() -> Vec<String> {
    sleep(Duration::from_millis(150)).await;
    vec!["ORD-001".into()]
}

async fn fetch_inventory() -> u32 {
    sleep(Duration::from_millis(100)).await;
    84
}

この例では、3つのフェッチ操作が並行して実行されるため、合計実行時間は最も遅い操作(200ms)とほぼ同じになります。

Rustの面接対策はできていますか?

インタラクティブなシミュレーター、flashcards、技術テストで練習しましょう。

select!は、複数のFutureの中で最初に完了したものを処理します。これは、タイムアウトの実装やレース条件の処理に有用です。

select_example.rsrust
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    tokio::select! {
        val = fetch_from_cache() => {
            println!("Cache hit: {val}");
        }
        val = fetch_from_database() => {
            println!("DB result: {val}");
        }
    }
}

async fn fetch_from_cache() -> String {
    sleep(Duration::from_millis(5)).await;
    "cached_value".into()
}

async fn fetch_from_database() -> String {
    sleep(Duration::from_millis(50)).await;
    "db_value".into()
}

この場合、キャッシュからの取得が先に完了するため、データベースクエリは自動的にキャンセルされます。

join!とselect!の使い分け

join!はすべての結果が必要な場合(例:複数のAPIからデータを集約)に使用します。select!は最初の結果だけが必要な場合(例:タイムアウト、キャッシュフォールバック)に使用します。間違った選択はパフォーマンス低下やリソースリークの原因になります。

非同期Rustにおけるエラーハンドリング

非同期コードでも、Rustの標準的なエラーハンドリングパターンが適用されます。Result型と?演算子を使用して、エラーを伝播させることができます。

error_handling.rsrust
use std::io;

#[derive(Debug)]
enum AppError {
    Network(reqwest::Error),
    Parse(serde_json::Error),
    Io(io::Error),
}

async fn load_config(url: &str) -> Result<Config, AppError> {
    let response = reqwest::get(url)
        .await
        .map_err(AppError::Network)?;

    let text = response.text()
        .await
        .map_err(AppError::Network)?;

    let config: Config = serde_json::from_str(&text)
        .map_err(AppError::Parse)?;

    Ok(config)
}

#[derive(serde::Deserialize)]
struct Config {
    db_url: String,
    port: u16,
}

非同期関数内での?演算子の動作は同期コードと同じですが、.awaitの後にチェーンする点に注意が必要です。エラー型の変換にはmap_errを使用するか、thiserrorクレートでFrom実装を自動生成することもできます。

タスク間の非同期通信チャネル

複数のタスク間でデータを安全にやり取りするために、Tokioはチャネルを提供します。mpsc(multiple producer, single consumer)チャネルは最も一般的なパターンです。

channel_example.rsrust
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // Bounded channel with capacity 32
    let (tx, mut rx) = mpsc::channel::<String>(32);

    // Producer task
    let producer = tokio::spawn(async move {
        for i in 0..5 {
            tx.send(format!("message-{i}")).await.unwrap();
            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
        }
        // tx dropped here, closing the channel
    });

    // Consumer reads until channel closes
    while let Some(msg) = rx.recv().await {
        println!("Received: {msg}");
    }

    producer.await.unwrap();
}

バウンドチャネルは、プロデューサーがコンシューマーを圧倒するのを防ぐバックプレッシャーメカニズムを提供します。キャパシティに達すると、sendは空きができるまで待機します。

実践パターン:レート制限付きHTTPリクエスト

実際のアプリケーションでは、同時実行数を制限する必要があることがよくあります。Semaphoreを使用すると、並行タスク数を制御できます。

rate_limited_fetcher.rsrust
use std::sync::Arc;
use tokio::sync::Semaphore;

async fn fetch_all(urls: Vec<String>, max_concurrent: usize) -> Vec<Result<String, String>> {
    let semaphore = Arc::new(Semaphore::new(max_concurrent));
    let mut handles = Vec::new();

    for url in urls {
        let sem = Arc::clone(&semaphore);
        let handle = tokio::spawn(async move {
            // Acquire permit before making request
            let _permit = sem.acquire().await.unwrap();
            reqwest::get(&url)
                .await
                .map(|r| r.status().to_string())
                .map_err(|e| e.to_string())
            // permit dropped here, allowing next task to proceed
        });
        handles.push(handle);
    }

    let mut results = Vec::new();
    for handle in handles {
        results.push(handle.await.unwrap());
    }
    results
}

このパターンは、外部APIのレート制限を尊重したり、システムリソースの枯渇を防いだりするのに役立ちます。セマフォの許可はRAIIガードとして機能し、スコープを抜けると自動的に解放されます。

PinとUnpin:非同期Rustに必要な理由

Future::pollのシグネチャに現れるPin<&mut Self>は、自己参照構造体を安全に扱うために必要です。async/awaitによって生成されるステートマシンは、.awaitポイント間で保持される参照を含む可能性があります。

Pinは、値がメモリ内で移動されないことを保証します。これにより、自己参照が無効になることを防ぎます。ほとんどの場合、開発者が直接Pinを扱う必要はありませんが、手動でFutureを実装する場合や、低レベルのライブラリを作成する場合には理解が必要です。

Unpinトレイトを実装する型は、Pinの制約を受けずに移動できます。プリミティブ型やほとんどの標準ライブラリ型はUnpinを実装しています。

パフォーマンス特性と非同期の使い所

非同期プログラミングは、すべての状況で最適な選択ではありません。CPU集約型のタスクには、spawn_blockingを使用して専用のスレッドプールで実行します。

spawn_blocking_example.rsrust
#[tokio::main]
async fn main() {
    let hash = tokio::task::spawn_blocking(|| {
        // CPU-intensive work runs on a blocking thread
        compute_hash(b"large dataset")
    })
    .await
    .unwrap();

    println!("Hash: {hash}");
}

fn compute_hash(data: &[u8]) -> String {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};
    let mut hasher = DefaultHasher::new();
    data.hash(&mut hasher);
    format!("{:x}", hasher.finish())
}

非同期ランタイムのワーカースレッドで長時間実行されるCPUバウンドの処理を行うと、他のタスクがブロックされ、全体的なスループットが低下します。spawn_blockingは、このような処理を別のスレッドプールにオフロードし、非同期ワーカーを解放します。

まとめ

Rustの非同期プログラミングは、学習曲線がありますが、正しく使用すれば優れたパフォーマンスとリソース効率を実現できます。以下のポイントを押さえておくことが重要です。

  • Futureは遅延評価される: .awaitまたはランタイムによるポーリングがなければ、処理は開始されません
  • Tokioが標準的な選択: マルチスレッドランタイム、タイマー、I/O、同期プリミティブを包括的に提供します
  • spawnには制約がある: スポーンされるFutureはSend + 'staticを満たす必要があります
  • join!select!を使い分ける: 全結果が必要か、最初の結果だけが必要かで選択します
  • チャネルでタスク間通信: mpscチャネルはバックプレッシャーを提供し、安全なデータ共有を可能にします
  • セマフォで並行数を制限: 外部APIのレート制限やリソース保護に有用です
  • CPU集約型タスクはspawn_blocking: 非同期ワーカーをブロックしないようにします

これらの概念を組み合わせることで、高スループットで信頼性の高い非同期アプリケーションを構築できます。

今すぐ練習を始めましょう!

面接シミュレーターと技術テストで知識をテストしましょう。

タグ

#rust
#async
#tokio
#futures
#concurrency

共有

関連記事