Celery を試してみる
Redash を介してお世話になることはあれど、単体で使ったことがなかったので改めて Celery を試してみる。
Homepage | Celery: Distributed Task Queue
Python のバージョンは 3.6.5。Celery のバージョンは 4.1.1 を使用した。
Celery とは
Celery: Distributed Task Queue
分散タスクキュー。とのこと。
Celery で使えるメッセージブローカ
Brokers — Celery 4.1.0 documentation
RabbitMQ, Redis, Amazon SQS あたりが使えるとのこと。今回は Redash の理解を深めることも目的のひとつなので、Redis を使うことにする。
結果を取得するための Result Backend
First Steps with Celery — Celery 4.1.0 documentation
実行されたタスクの結果を利用したい場合、結果の保存には Result Backend という仕組みをつかうらしい。
保存先は SQLAlchemy, Memcached, Redis などが使用できるようだが、今回は Result Backend も Redash にあわせて Redis を使う。
試してみる
worker 上で動作するタスクの実装
非同期で動くことを確認するため、与えられたメッセージ文字列を0-2秒待ったあとに、そのまま結果として返す簡単なタスクを用意した。
from celery import Celery from time import sleep from random import randint app = Celery('tasks', backend='redis://localhost', broker='redis://localhost') @app.task def hello(m): sleep(randint(0, 2)) return m
非同期タスクを呼び出すアプリケーションの実装
FizzBuzz ループの中で 3 Fizz
のような文字列を先程のタスクで実行する。
すべて非同期実行をしたあとに、処理結果を確認、表示する。
from collections import deque import tasks results = deque([]) for i in range(1, 42): if i % 15 == 0: results.append(tasks.hello.delay('{} FizzBuzz'.format(i))) elif i % 3 == 0: results.append(tasks.hello.delay('{} Fizz'.format(i))) elif i % 5 == 0: results.append(tasks.hello.delay('{} Buzz'.format(i))) else: results.append(tasks.hello.delay('{}'.format(i))) print('すべてのタスクがキューに入りました') while len(results) > 0: result = results.popleft() if result.ready(): print(result.get()) continue results.append(result)
実行してみる
まずは worker を起動する。
$ celery --app=tasks worker --loglevel=info -c 2
--app
で非同期実行されるタスクが書かれたスクリプト名を渡し、 --loglevel
は文字通りログレベル、 -c
で並列実行のための子プロセス数を指定する。
次に、アプリケーションを実行する。
これはただの Python スクリプトなので、 python
コマンドで実行するだけ。
$ python main.py すべてのタスクがキューに入りました 1 2 4 3 Fizz 5 Buzz ...省略... 39 Fizz 40 Buzz 41
実行すると、「すべてのタスクがキューに入りました」というメッセージのあとに、1から41までの数字と、条件にあった数値は「Fizz」などの文字列を付加した結果が表示される。
上の例では、 2
の次の結果が 4
になっているが、これはタスク側でランダムに待ちをいれているためなので問題なし。
一方、アプリケーション実行時の worker 側の出力はこんな感じ。
[2018-05-28 20:08:15,640: INFO/MainProcess] Received task: tasks.hello[cbf77878-c2b4-48ec-8214-491155479188] [2018-05-28 20:08:15,643: INFO/MainProcess] Received task: tasks.hello[f1808ca7-f663-4c9e-8343-be5581b464da] [2018-05-28 20:08:15,647: INFO/MainProcess] Received task: tasks.hello[74ade9ad-8136-4ed9-8ee1-029cb9a003a9] [2018-05-28 20:08:15,652: INFO/MainProcess] Received task: tasks.hello[02925313-3346-41a8-bab8-9b4506151fc4] [2018-05-28 20:08:15,657: INFO/MainProcess] Received task: tasks.hello[e0558aef-3206-4feb-a09e-97bc48e9496d] [2018-05-28 20:08:15,661: INFO/MainProcess] Received task: tasks.hello[3dae0719-daa8-4c42-b703-1a2bf7293bae] [2018-05-28 20:08:15,667: INFO/MainProcess] Received task: tasks.hello[94a9a252-2794-413f-8aac-fd781a893cab] [2018-05-28 20:08:15,674: INFO/MainProcess] Received task: tasks.hello[84d9a82a-571a-477b-8911-4010083cc0b6] ...省略... [2018-05-28 20:08:35,937: INFO/ForkPoolWorker-2] Task tasks.hello[7c86fa11-a1c4-4261-900a-bcd6f261d76d] succeeded in 0.0011464759991213214s: '36 Fizz' [2018-05-28 20:08:36,940: INFO/ForkPoolWorker-2] Task tasks.hello[8f6658a9-2fcc-47a7-98e8-cae9a8096546] succeeded in 1.0012103780027246s: '37' [2018-05-28 20:08:37,813: INFO/ForkPoolWorker-1] Task tasks.hello[af3513f5-c6af-4e64-b324-7c4b1f8634bc] succeeded in 2.0011870139969687s: '35 Buzz' [2018-05-28 20:08:37,945: INFO/ForkPoolWorker-2] Task tasks.hello[38ecd7d3-2897-43ba-8e0c-1f36f24d7acb] succeeded in 1.0030830889991194s: '38' [2018-05-28 20:08:38,836: INFO/ForkPoolWorker-1] Task tasks.hello[f58921ba-5322-419a-b3d9-5d6f6d27e2ba] succeeded in 1.0212826329989184s: '39 Fizz' [2018-05-28 20:08:38,947: INFO/ForkPoolWorker-2] Task tasks.hello[1fff8e54-ded8-4a85-86f5-9f68fd9f2aa5] succeeded in 1.0010614519997034s: '40 Buzz' [2018-05-28 20:08:40,861: INFO/ForkPoolWorker-1] Task tasks.hello[2ac6cfeb-e204-427b-9ea6-7ddbd6fcfc6f] succeeded in 2.0224582139999256s: '41'
キューにタスクが積まれて、その後徐々に処理されていく様子がわからなくもない。
タスクには一意の ID が振られていて、ぱっと見で UUID っぽく見える。
worker を2つにしてみる
コードは変えず、 Celery の worker プロセスをもう一つ起動してみる。
$ celery --app=tasks worker --loglevel=info -c 2
この状態でアプリケーションを実行すると。
$ python main.py すべてのタスクがキューに入りました 3 Fizz 1 2 4 5 Buzz ...省略... 31 36 Fizz 38 34 40 Buzz
worker が増えたため、先程の実行結果よりさらに処理順序にばらつきが出ている。
worker 側の出力は割愛するが、 どちらの worker でも実行ログが出ているので、おそらく均一に処理されているものと思われる。
まとめ
タスクキューってだいたいこんな感じだよね。という使い心地だった。
Redash のコードを追う際の基礎知識として、触ってみてよかったと思う。
今度は Redash のクエリワーカを複数にしても問題なく動作するのか?という検証をしてみるつもり。
余談: Celery のタスクはどのような形で メッセージブローカ(Redis)に保存されているのか?
気になったので redis-cli
で見てみる。
$ redis-cli 127.0.0.1:6379> keys * ...省略... 231) "celery-task-meta-6dae9f06-bfb5-497a-8926-2b03dfb8a1a3" 232) "celery" 233) "celery-task-meta-68ab5751-0fa7-45d6-9c97-9aee871b410b" ...省略...
celery
というキーがあったので型を確認したら list
型だった。
127.0.0.1:6379> type celery list
1件取り出してみる。
127.0.0.1:6379> LRANGE celery 0 0 1) "{\"body\": \"W1siNDEiXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"tasks.hello\", \"id\": \"b69ad207-308f-4c73-988a-8321a640dbdc\", \"eta\": null, \"expires\": null, \"group\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"b69ad207-308f-4c73-988a-8321a640dbdc\", \"parent_id\": null, \"argsrepr\": \"('41',)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen14561@ariarijp.local\"}, \"properties\": {\"correlation_id\": \"b69ad207-308f-4c73-988a-8321a640dbdc\", \"reply_to\": \"e3c01706-26cb-3330-98aa-b5551bd6e56b\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"0c5d61f8-ae4a-492e-904f-cbd4b551c3ca\"}}"
JSON。ちょっと見にくいので整形するとこんな感じ。
{ "body": "W1siNDEiXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": { "lang": "py", "task": "tasks.hello", "id": "b69ad207-308f-4c73-988a-8321a640dbdc", "eta": null, "expires": null, "group": null, "retries": 0, "timelimit": [ null, null ], "root_id": "b69ad207-308f-4c73-988a-8321a640dbdc", "parent_id": null, "argsrepr": "('41',)", "kwargsrepr": "{}", "origin": "gen14561@ariarijp.local" }, "properties": { "correlation_id": "b69ad207-308f-4c73-988a-8321a640dbdc", "reply_to": "e3c01706-26cb-3330-98aa-b5551bd6e56b", "delivery_mode": 2, "delivery_info": { "exchange": "", "routing_key": "celery" }, "priority": 0, "body_encoding": "base64", "delivery_tag": "0c5d61f8-ae4a-492e-904f-cbd4b551c3ca" } }
body
の中身は Base64 エンコードしてるように見えるのでデコードしてみた。
[["41"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]
celery-task-meta- *
みたいなのも気になるので見てみる。
127.0.0.1:6379> type celery-task-meta-e6f5f414-1d8f-42c2-b614-75f9615ec61f string
文字列型。JSON かな。
127.0.0.1:6379> get celery-task-meta-e6f5f414-1d8f-42c2-b614-75f9615ec61f "{\"status\": \"SUCCESS\", \"result\": \"41\", \"traceback\": null, \"children\": [], \"task_id\": \"e6f5f414-1d8f-42c2-b614-75f9615ec61f\"}"
JSON だった。Result Backend を使用すると、きっと celery-task-meta-*
に結果を書き出すのだろう。
例外を起こしたらどうなるのか?
気になったので、例外が起きるように書き換え、その結果を見てみた。
127.0.0.1:6379> get celery-task-meta-d5024461-52b3-429b-ab71-57c8e76522b3 "{\"status\": \"FAILURE\", \"result\": {\"exc_type\": \"Exception\", \"exc_message\": \"\"}, \"traceback\": \"Traceback (most recent call last):\\n File \\\"/Users/ariarijp/.local/share/virtualenvs/celerytutorial-nzo1Q02k/lib/python3.6/site-packages/celery/app/trace.py\\\", line 375, in trace_task\\n R = retval = fun(*args, **kwargs)\\n File \\\"/Users/ariarijp/.local/share/virtualenvs/celerytutorial-nzo1Q02k/lib/python3.6/site-packages/celery/app/trace.py\\\", line 632, in __protected_call__\\n return self.run(*args, **kwargs)\\n File \\\"/Users/ariarijp/workspace/python/celerytutorial/tasks.py\\\", line 12, in hello\\n raise Exception()\\nException\\n\", \"children\": [], \"task_id\": \"d5024461-52b3-429b-ab71-57c8e76522b3\"}"
ちょっと長いので整形。
{ "status": "FAILURE", "result": { "exc_type": "Exception", "exc_message": "" }, "traceback": "Traceback (most recent call last):\\n File \\"/Users/ariarijp/.local/share/virtualenvs/celerytutorial-nzo1Q02k/lib/python3.6/site-packages/celery/app/trace.py\\", line 375, in trace_task\\n R = retval = fun(*args, **kwargs)\\n File \\"/Users/ariarijp/.local/share/virtualenvs/celerytutorial-nzo1Q02k/lib/python3.6/site-packages/celery/app/trace.py\\", line 632, in __protected_call__\\n return self.run(*args, **kwargs)\\n File \\"/Users/ariarijp/workspace/python/celerytutorial/tasks.py\\", line 12, in hello\\n raise Exception()\\nException\\n", "children": [], "task_id": "d5024461-52b3-429b-ab71-57c8e76522b3" }
ステータスが FAILURE
になり、結果に例外の種類やトレースバックが含まれるようになった。
この値を使って結果を受け取るときに例外を発生させている様子。
ここのコードを追っていったら、Python で Promise を実現するための vine というモジュールがあることを知った。Celery プロジェクトの一部っぽい。