pythonでマルチスレッドプログラミング
pythonは簡単にマルチスレッドで動作するプログラムを簡単に作れる。
javaだと、それ用にクラスを作成する必要があるが、pythonでは関数をマルチスレッドで動かすことができる。
ちょっと作ってみたツールが、性能的に問題がある場合、該当処理を関数に切り出し、それをマルチスレッドで動かせる。
とりあえずプログラムの全貌
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | #!/usr/bin/env python import threading import queue import logging def worker(q): '''pythonでキューを多重処理するスレッド''' while True: item = q.get() if item is None: break logging.info(item) '''どのスレッドで処理されているかどうか見たいので、logging機能を使ってログ出力する''' logging.basicConfig( format = '%(asctime)s [%(levelname)-5s] [%(thread)5d] (%(name)s) %(message)s', level = 'INFO') '''開始メッセージを出力''' logging.info('start') '''処理キューを登録するためのqueueを作成する''' q = queue.Queue() '''キューを処理するスレッドを準備する''' threads = [] for i in range(20): t = threading.Thread(target = worker, args = (q, )) t.start() threads.append(t) '''キューにタスクを登録する''' for item in range(100): q.put(item) '''スレッドの数だけ、キュー終端のNoneをキューに登録する''' for t in threads: q.put(None) '''スレッドが終了することを待ち合わせる''' for t in threads: t.join() '''終了メッセージを出力''' logging.info('end') |
処理をする関数を作成する
1 2 3 4 5 6 7 | def worker(q): '''pythonでキューを多重処理するスレッド''' while True: item = q.get() if item is None: break logging.info(item) |
while Trueの無限ループを作成しておき、queueからキューを1つポップする。
キューからポップした値がNoneのときはループを抜ける。
キューの最後にNoneを入れるのは、後のキュー登録の後処理で行う。
検証用にloggingを準備する
1 2 | [%(thread)5d] (%(name)s) %(message)s', level = 'INFO') |
loggingを利用するとログにスレッドのIDを出力できるようになる。
これを利用して、マルチスレッドで処理されていることを確認する。
空のキューを用意し、マルチスレッドで動作させるスレッドを用意する
1 2 3 4 5 6 7 | q = queue.Queue() '''キューを処理するスレッドを準備する''' threads = [] for i in range(20): t = threading.Thread(target = worker, args = (q, )) t.start() threads.append(t) |
先に空のキューを作成しておく。
そのキューを引数に取る、マルチスレッドで実行するスレッドを作成する。
実行したスレッドは、後で処理の待ち合わせをするために、リストに保存しておく。
キューにタスクを登録する
1 2 | for item in range(100): q.put(item) |
このサンプルではシンプルな数値を設定しているが、キューにはなんでも登録できるので、
DBから取得したデータや、DTOなど、なんでも自由に設定できる。
キューの終端を登録する
1 2 | for t in threads: q.put(None) |
マルチスレッドで実行される関数は、キューからNoneを取得した時点でスレッドを終了するように定義している。
なので、キューに実行すべきキューを登録した後に、スレッドの数だけ終端の合図(None)を設定しておく。
各スレッドが終了することを確認する
1 2 | for t in threads: t.join() |
スレッドのjoin()は、スレッドが完了したときに戻ってくる。
これを利用して、すべてのスレッドのjoin()が完了するまで処理を待つ。
まとめ
これらの応用することで、簡単にマルチスレッドで処理することができる。
私の場合は、処理は他のシステムのAPIを呼ぶことが多い。
他のシステムAPIがネックで、待ちが多く発生しているときなどは、その部分をマルチスレッドにし、
幾つか多重でリクエストをかけている。
同じデータを複数処理してももったいないので、キューに一度に一気に溜め込んでいる。
また、結果出力はsqliteを使うことが多いが、sqlite3はマルチに書き込むと思わぬエラーが発生することがあるので、
sqliteへの書き込みもスレッド化(シングル)し、各マルチ処理は書き込みキューへキューイングすることで実装している。
javaで作ればいくつもクラスを作って冗長になりがちな実装が、pythonだと1ファイルで簡単に作成できてしまう。
- BOM付きUTF-8からBOMを除去するpython
- jythonのヒープメモリ設定
- Kotlin1.2.50がリリースされていた
- macOSをHigh Sierraに更新した。スクリプト実行環境のバージョンは変わったのか
- macOSをMojaveに更新した。スクリプト実行環境のバージョンは変わったか
- pathlib.Pathを調べる
- pyenvでpython環境をインストール
- pyenvとvirtualenvでpython環境を構築
- python unpack
- python3で数値のlistを文字列のlistに変換