【Python】Queue(キュー)でスレッド間のデータ受け渡しを安全にする方法:非同期処理の必須知識

この記事は約5分で読めます。

Pythonでマルチスレッドを実装した際、次に直面するのが「スレッド間でどうやって安全にデータをやり取りするか?」という問題です。

ここで登場するのがqueue.Queueです。これは、RTOS(リアルタイムOS)やLinuxの「メッセージキュー」によく似た仕組みで、非同期処理を支える非常に重要な機能です。

本記事では、スレッド間でデータを受け渡しするための標準的な方法を、初心者にも分かりやすく解説します。

1. なぜスレッド間の通信にQueueが必要なのか?

複数のスレッドが動いているとき、一つの変数に対して同時に読み書きを行うと、データが壊れたり、予期せぬ動作(競合状態)を引き起こしたりします。

そこで活用されるのがQueue(キュー)です。

  • スレッドセーフ: 複数のスレッドから同時にアクセスしてもデータが壊れないよう設計されています。
  • 非同期の仲介役: 「データを送る側(Producer)」と「データを使う側(Consumer)」の処理速度の差を吸収できます。
  • 疎結合: 各スレッドがお互いの内部状態を知らなくても、Queueを介すだけで情報のやり取りが可能になります。

2. Queueの基本構造:FIFO(先入れ先出し)

Pythonのqueue.Queueは、基本的にFIFO (First-In, First-Out)形式です。 最初に投入されたデータが、最初に取り出されます。

主なメソッド

  • put(item): データをキューに入れる。
  • get(): データを取り出す(データが空の場合は、来るまで待機する)。
  • get_nowait(): データが空なら待たずに例外を投げる。

3. 実践コード:受信スレッドからメイン処理へデータを渡す

前回の「スレッド化した通信クラス」を例に、受信したデータをメイン処理に渡して活用するコードを見てみましょう。

import threading
import queue
import time
import random

# --- 1. スレッドクラス(データを作る側:Producer) ---
class ReceiverThread(threading.Thread):
    def __init__(self, data_queue):
        super().__init__()
        self.data_queue = data_queue
        self.is_running = True
        self.daemon = True

    def run(self):
        print("  [Thread] 通信待ち受け中...")
        while self.is_running:
            # 擬似的な受信処理(2〜4秒のランダム)
            time.sleep(random.uniform(2, 4))
            mock_data = f"Sensor-{random.randint(10, 99)}"
            
            # キューにデータを投入
            print(f"  [Thread] データを取得: {mock_data}")
            self.data_queue.put(mock_data)

    def stop(self):
        self.is_running = False

# --- 2. メイン処理(データを使う側:Consumer) ---
if __name__ == "__main__":
    # スレッド間共有のキューを作成
    shared_queue = queue.Queue()

    # スレッド起動
    receiver = ReceiverThread(shared_queue)
    receiver.start()

    print("[Main] メイン処理を開始します。")

    try:
        while True:
            # キューからデータを取り出す
            # get() はデータが来るまでここで「待機」してくれる(重要)
            print("[Main] データ待ち...")
            data = shared_queue.get() 
            
            # 受け取ったデータを処理する
            print(f"[Main] ★受信データを解析:{data} をデータベースに保存しました")
            
            # サンプルのため5回受信したら終了
            if "Sensor-99" in data: break 

    except KeyboardInterrupt:
        pass
    finally:
        receiver.stop()
        receiver.join()
        print("[Main] すべての処理を終了しました。")

4. RTOSやLinuxのMessage Queueとの共通点

組み込み開発(RTOS)やLinuxでのシステムプログラミング経験がある方にとって、このqueue.Queueは非常に親しみやすいはずです。

  • 非同期イベントの通知: 割り込み処理で受け取ったデータをメインタスクに投げるのと全く同じ感覚で使用できます。
  • バッファリング: 短時間に大量のデータが届いても、Queueがバッファとなってメイン側の処理落ちを防ぎます。

筆者も、TCP/UDP通信クラスを作成する際は、必ずと言っていいほどこのQueueをセットで実装します。受信スレッドを「受信に専念」させ、受け取ったデータは即座にQueueへ放り込む。これが最も堅牢で効率的な設計パターンだからです。

5. まとめ:Queueを使えば非同期処理は怖くない

スレッドクラスとQueueを組み合わせることで、Pythonでの非同期処理は一気に実用的になります。

  1. スレッドで「待ち」を発生させない。
  2. Queueで安全にデータを橋渡しする。
  3. メインで必要な時にデータを取り出して処理する。

この3ステップを意識するだけで、複雑な通信プログラムやリアルタイム処理が驚くほどスッキリ記述できるようになります。

コメント

タイトルとURLをコピーしました