Pythonでイベント駆動プログラミング: select文の使い方

私は普段組み込みシステムのエンジニアです。RTOSを使った組み込みシステムを開発しているとOSの機能を使って、イベント駆動アプリを制作するのが一般的です。

ですが、PythonではLinux等のように基本的に同期関数となっていて、一つのスレッドで複数のイベントを受け付けるのが標準のAPIで、非常に実装しずらくなっています。

同期関数とは、呼び出したらその中が完了するまで処理が返ってこない関数のことです。例としていうのであれば、Socket関数のrecv関数等がそういう作りになっています。

こういったことを解決するために、LinuxではSelect関数を使って、複数のIO入力を取得する方法があります。そして、実はPythonにもSelectライブラリが組み込まれていて、同じような使い方ができるようになっているんです。

Select文サンプルコード

ですので、今回はPythonでのSelect文の使い方を紹介します。

では、まずいつも通りコードの全体をまず記載します。

import time
import threading
import socket
import select

Portnum = 12345
stop_application = False

def create_server(ip,port):
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind( (ip,port) )
    server.listen()
    return server


def create_client(ip,port):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect( (ip,port) )
    return client


def tm_callback( con ):
    if stop_application == False:
        snd = bytearray([5, 6, 7, 8])
        con.send( snd )


def th1_func():
    global stop_application
    server = create_server('127.0.0.1',Portnum)
    (con, client) = server.accept()
    
    fd_set = set()
    fd_set.add(con)
    
    while True:
        try:
            if stop_application == False:
                r, w, x = select.select(list(fd_set), [], [], 10)
                for fd in r:
                    data = fd.recv(1024) 
                    print(data)
                    time.sleep(0.1)
                    snd = bytearray([1, 2, 3, 4])
                    fd.send( snd )
            else:
                con.close()
                break
        except:
            con.close()
            break

def th2_func():
    global stop_application
    
    con = create_client('127.0.0.1',Portnum)
    
    fd_set = set()
    fd_set.add(con)
    
    tm = threading.Timer( 1 , tm_callback,args=[con,]  )
    tm.start()
    
    while True:
        try:
            if stop_application == False:
                r, w, x = select.select(list(fd_set), [], [], 10)
                for fd in r:
                    data = fd.recv(1024) 
                    print(data)
                    time.sleep(0.1)
                    snd = bytearray([9, 10, 11, 12])
                    fd.send( snd )
            else:
                con.close()
                break
        except:
            con.close()
            break

def main_func( ):
    global stop_application
    thread1 = threading.Thread(target=th1_func)
    thread1.start()

    thread2 = threading.Thread(target=th2_func)
    thread2.start()
    
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            print("stop")
            stop_application = True
            thread1.join()
            thread2.join()
            break
    
if __name__ == '__main__':
 result = main_func() 

続いてはプログラムの解説ですが、解説は次ページにて記載します。

プログラム解説

まず、Thread1 をServer Thread2をClientとして立ちあげて、Thread1 と Thread2 両方で、データを受信したら応答としてデータを送信するようにしてあります。
そして、Thread2 側では、通信のきっかけを与えるために、Timerを起動して1秒後にデータを送信しています。

##マルチスレッド・Socket通信・タイマーを使っていますが、以前の記事で紹介していますので そもそも マルチスレッド・Socket通信 。タイマーてどう使うの?って方は以下を確認ください。

Pythonでのマルチスレッド処理について詳しくはこちら

Socket通信を使ったプログラムの例はこちら

Pythonでタイマーを使った周期処理を行う方法はこちら

Select使い方解説

Selectの関係部分を抜き出してして、以下に記述します。

import select

    fd_set = set()
    fd_set.add(con)

    while True:
     r, w, x = select.select(list(fd_set), [], [], 10)
     for fd in r:
         data = fd.recv(1024) 
         print(data)
         time.sleep(0.1)
         snd = bytearray([1, 2, 3, 4])
         fd.send( snd )

まず、selectを使用するためにライブラリをインポートします。
以下です。標準ライブラリですので、何も考えずに以下をインポートしてください。

import select

続いて、selectに与えるリストを生成します。

    fd_set = set()            # インスタンスを取得
    fd_set.add(con)           # 受信したソケットをセット

複数取得したい場合は、 fd_set.add( )を複数回呼べば登録可能です。

続いて、肝心のSelect関数です。以下のように記載すれば複数のIO入力を待ってくれます。
※Select自体は同期関数ですので、イベントとして処理したいものはすべてここに登録してください。

第1引数に先ほど生成したリスト第4引数にタイムアウト時間を設定してください。
※第2と第3の引数の使い方は、すいません詳しくは知りません。。。多分、Write完了となにかだと思います。

r, w, x = select.select(list(fd_set), [], [], 10)

そして、以下のようにfor文でループさせれば、イベントが発生したものをfdに引き渡ししてくれますので、そのままrecv()やsend()を呼び出せば使用できます。

for fd in r:
    fd.recv()
    fd.send(data)

実行結果

では、本コードの実行結果です。

出力されているデータとしては以下の通りで、それぞれ正しくデータを受信して送信できることを確認できました。

b’\x05\x06\x07\x08’というのがタイマコールバックから送信
b’\x01\x02\x03\x04’がThread1からの送信(Thread1が受信したことをトリガに送信)
b’\t\n\x0b\x0c’が Thread2からの送信 (Thread2が受信したことをトリガに送信)

まとめ

ちょっと、Selectを使用するまでにひと手間あったり、Select受信あとにひと手間いりますが、結構簡単にイベント駆動っぽい処理ができるようになりました。

複雑な処理を使用とすればするほど、こういった知識が重要になってくると思いますので、みなさんも使い方を覚えて使用してみてください。

Pythonについて勉強したい人は以下がおすすめです。私も持っていてたまに眺めて勉強していますものですのでぜひ購入して学習してみてください。

にいやん

出身 : 関西 居住区 : 関西 職業 : 組み込み機器エンジニア (エンジニア歴13年) 年齢 : 38歳(2022年11月現在) 最近 業務の効率化で噂もありPython言語に興味を持ち勉強しています。 そこで学んだことを記事にして皆さんとシェアさせていただければと思いブログをはじめました!! 興味ある記事があれば皆さん見ていってください!! にほんブログ村