Pythonのスレッド機能は並行処理を実現する重要な仕組みです。本記事ではスレッドの基本概念から実践的な活用方法まで、具体例を交えて詳細に解説します。さらに、効率的な並行処理を実現するThreadPoolについても深く掘り下げていきます。
Pythonのthreading
モジュールは軽量な並行処理を実装するための標準ライブラリです。スレッドを使用することで、I/O待ち時間の長い処理を効率的に実行できます。
主な特徴:
典型的な実装例:
import threading
def task():
print("スレッド実行中")
thread = threading.Thread(target=task)
thread.start()
Thread
クラスを直接使用する基本形。メインスレッドの終了後も実行を継続します。
class CustomThread(threading.Thread):
def run(self):
print("カスタムスレッド実行")
daemon=True
を設定するとメインスレッド終了時に自動停止。バックグラウンド処理に適しています。
daemon_thread = threading.Thread(target=task, daemon=True)
Lock
オブジェクトを使用した排他制御。共有リソースへの安全なアクセスを実現します。
lock = threading.Lock()
with lock:
# クリティカルセクション
ThreadPoolExecutor
による効率的なスレッド管理。タスクキューイングとリソース制御が可能です。
ThreadPoolはconcurrent.futures
モジュールのThreadPoolExecutor
クラスを中心に、タスクの非同期実行とリソース管理を最適化します。
ThreadPoolExecutor
は次の特徴を持ちます:
max_workers
数で同時実行数を制御基本実装例:
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * 2
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(10)]
results = [f.result() for f in futures]
submit()
メソッドで個別タスクを登録:
future = executor.submit(pow, 323, 1235) # 323^1235を計算[3]
イテラブルオブジェクトを一括処理:
with ThreadPoolExecutor() as executor:
results = executor.map(task, [1, 2, 3, 4]) # 各要素にtask関数を適用[15]
非同期結果のハンドリング:
def callback(future):
print(f"Result: {future.result()}")
future.add_done_callback(callback) # タスク完了時に自動実行[1]
特徴 | スレッド | プロセス |
---|---|---|
メモリ共有 | 同一プロセス内で共有 | 独立 |
起動速度 | 高速(約1ms) | 低速(約10ms) |
リソース消費 | 少ない | 多い |
データ通信 | 共有メモリで直接アクセス可能 | IPCが必要 |
GILの影響 | あり | なし |
適用例 | I/Oバウンド処理 | CPUバウンド処理 |
import requests
from threading import Thread
def fetch_url(url):
response = requests.get(url)
print(f"{url}: {len(response.content)} bytes")
urls = ["https://example.com", "https://example.org"]
threads = [Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
urls = ["https://example.com"] * 10
with ThreadPoolExecutor() as executor:
futures = {executor.submit(requests.get, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
print(f"{url}: {len(future.result().content)} bytes")[2]
def progress_updater():
while True:
print(f"処理進捗: {current_progress}%")
time.sleep(1)
progress_thread = Thread(target=progress_updater, daemon=True)
progress_thread.start()
max_workers=min(32, os.cpu_count() + 4)
が目安try:
result = future.result()
except Exception as e:
print(f"Error: {e}")[1]
Lock
やQueue
の活用PythonのスレッドとThreadPoolはI/Oバウンド処理の最適化に有効ですが、GILの制約を理解することが重要です。デーモンスレッドによるバックグラウンド処理やロック機構を活用した安全な並行処理の実装が可能です。ThreadPoolExecutor
を活用することで、以下のメリットが得られます:
大規模なCPU集中型処理にはProcessPoolExecutor
、高頻度の軽量タスクにはasyncio
との併用を検討しましょう。適切な並行処理手法の選択がパフォーマンス向上の鍵となります。