Celery を試してみる

Redash を介してお世話になることはあれど、単体で使ったことがなかったので改めて Celery を試してみる。

Homepage | Celery: Distributed Task Queue

Python のバージョンは 3.6.5。Celery のバージョンは 4.1.1 を使用した。

Celery とは

Celery: Distributed Task Queue

分散タスクキュー。とのこと。

Celery で使えるメッセージブローカ

Brokers — Celery 4.1.0 documentation

RabbitMQ, Redis, Amazon SQS あたりが使えるとのこと。今回は Redash の理解を深めることも目的のひとつなので、Redis を使うことにする。

結果を取得するための Result Backend

First Steps with Celery — Celery 4.1.0 documentation

実行されたタスクの結果を利用したい場合、結果の保存には Result Backend という仕組みをつかうらしい。

保存先は SQLAlchemy, Memcached, Redis などが使用できるようだが、今回は Result Backend も Redash にあわせて Redis を使う。

試してみる

worker 上で動作するタスクの実装

非同期で動くことを確認するため、与えられたメッセージ文字列を0-2秒待ったあとに、そのまま結果として返す簡単なタスクを用意した。

from celery import Celery
from time import sleep
from random import randint

app = Celery('tasks', backend='redis://localhost', broker='redis://localhost')

@app.task
def hello(m):
    sleep(randint(0, 2))
    return m

非同期タスクを呼び出すアプリケーションの実装

FizzBuzz ループの中で 3 Fizz のような文字列を先程のタスクで実行する。

すべて非同期実行をしたあとに、処理結果を確認、表示する。

from collections import deque

import tasks

results = deque([])

for i in range(1, 42):
    if i % 15 == 0:
        results.append(tasks.hello.delay('{} FizzBuzz'.format(i)))
    elif i % 3 == 0:
        results.append(tasks.hello.delay('{} Fizz'.format(i)))
    elif i % 5 == 0:
        results.append(tasks.hello.delay('{} Buzz'.format(i)))
    else:
        results.append(tasks.hello.delay('{}'.format(i)))

print('すべてのタスクがキューに入りました')

while len(results) > 0:
    result = results.popleft()
    if result.ready():
        print(result.get())
        continue

    results.append(result)

実行してみる

まずは worker を起動する。

$ celery --app=tasks worker --loglevel=info -c 2

--app で非同期実行されるタスクが書かれたスクリプト名を渡し、 --loglevel は文字通りログレベル、 -c で並列実行のための子プロセス数を指定する。

次に、アプリケーションを実行する。

これはただの Python スクリプトなので、 python コマンドで実行するだけ。

$ python main.py
すべてのタスクがキューに入りました
1
2
4
3 Fizz
5 Buzz

...省略...

39 Fizz
40 Buzz
41

実行すると、「すべてのタスクがキューに入りました」というメッセージのあとに、1から41までの数字と、条件にあった数値は「Fizz」などの文字列を付加した結果が表示される。

上の例では、 2 の次の結果が 4 になっているが、これはタスク側でランダムに待ちをいれているためなので問題なし。

一方、アプリケーション実行時の worker 側の出力はこんな感じ。

[2018-05-28 20:08:15,640: INFO/MainProcess] Received task: tasks.hello[cbf77878-c2b4-48ec-8214-491155479188]
[2018-05-28 20:08:15,643: INFO/MainProcess] Received task: tasks.hello[f1808ca7-f663-4c9e-8343-be5581b464da]
[2018-05-28 20:08:15,647: INFO/MainProcess] Received task: tasks.hello[74ade9ad-8136-4ed9-8ee1-029cb9a003a9]
[2018-05-28 20:08:15,652: INFO/MainProcess] Received task: tasks.hello[02925313-3346-41a8-bab8-9b4506151fc4]
[2018-05-28 20:08:15,657: INFO/MainProcess] Received task: tasks.hello[e0558aef-3206-4feb-a09e-97bc48e9496d]
[2018-05-28 20:08:15,661: INFO/MainProcess] Received task: tasks.hello[3dae0719-daa8-4c42-b703-1a2bf7293bae]
[2018-05-28 20:08:15,667: INFO/MainProcess] Received task: tasks.hello[94a9a252-2794-413f-8aac-fd781a893cab]
[2018-05-28 20:08:15,674: INFO/MainProcess] Received task: tasks.hello[84d9a82a-571a-477b-8911-4010083cc0b6]

...省略...

[2018-05-28 20:08:35,937: INFO/ForkPoolWorker-2] Task tasks.hello[7c86fa11-a1c4-4261-900a-bcd6f261d76d] succeeded in 0.0011464759991213214s: '36 Fizz'
[2018-05-28 20:08:36,940: INFO/ForkPoolWorker-2] Task tasks.hello[8f6658a9-2fcc-47a7-98e8-cae9a8096546] succeeded in 1.0012103780027246s: '37'
[2018-05-28 20:08:37,813: INFO/ForkPoolWorker-1] Task tasks.hello[af3513f5-c6af-4e64-b324-7c4b1f8634bc] succeeded in 2.0011870139969687s: '35 Buzz'
[2018-05-28 20:08:37,945: INFO/ForkPoolWorker-2] Task tasks.hello[38ecd7d3-2897-43ba-8e0c-1f36f24d7acb] succeeded in 1.0030830889991194s: '38'
[2018-05-28 20:08:38,836: INFO/ForkPoolWorker-1] Task tasks.hello[f58921ba-5322-419a-b3d9-5d6f6d27e2ba] succeeded in 1.0212826329989184s: '39 Fizz'
[2018-05-28 20:08:38,947: INFO/ForkPoolWorker-2] Task tasks.hello[1fff8e54-ded8-4a85-86f5-9f68fd9f2aa5] succeeded in 1.0010614519997034s: '40 Buzz'
[2018-05-28 20:08:40,861: INFO/ForkPoolWorker-1] Task tasks.hello[2ac6cfeb-e204-427b-9ea6-7ddbd6fcfc6f] succeeded in 2.0224582139999256s: '41'

キューにタスクが積まれて、その後徐々に処理されていく様子がわからなくもない。

タスクには一意の ID が振られていて、ぱっと見で UUID っぽく見える。

worker を2つにしてみる

コードは変えず、 Celery の worker プロセスをもう一つ起動してみる。

$ celery --app=tasks worker --loglevel=info -c 2

この状態でアプリケーションを実行すると。

$ python main.py
すべてのタスクがキューに入りました
3 Fizz
1
2
4
5 Buzz

...省略...

31
36 Fizz
38
34
40 Buzz

worker が増えたため、先程の実行結果よりさらに処理順序にばらつきが出ている。

worker 側の出力は割愛するが、 どちらの worker でも実行ログが出ているので、おそらく均一に処理されているものと思われる。

まとめ

タスクキューってだいたいこんな感じだよね。という使い心地だった。

Redash のコードを追う際の基礎知識として、触ってみてよかったと思う。

今度は Redash のクエリワーカを複数にしても問題なく動作するのか?という検証をしてみるつもり。

余談: Celery のタスクはどのような形で メッセージブローカ(Redis)に保存されているのか?

気になったので redis-cli で見てみる。

$ redis-cli
127.0.0.1:6379> keys *

...省略...

231) "celery-task-meta-6dae9f06-bfb5-497a-8926-2b03dfb8a1a3"
232) "celery"
233) "celery-task-meta-68ab5751-0fa7-45d6-9c97-9aee871b410b"

...省略...

celery というキーがあったので型を確認したら list 型だった。

127.0.0.1:6379> type celery
list

1件取り出してみる。

127.0.0.1:6379> LRANGE celery 0 0
1) "{\"body\": \"W1siNDEiXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"tasks.hello\", \"id\": \"b69ad207-308f-4c73-988a-8321a640dbdc\", \"eta\": null, \"expires\": null, \"group\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"b69ad207-308f-4c73-988a-8321a640dbdc\", \"parent_id\": null, \"argsrepr\": \"('41',)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen14561@ariarijp.local\"}, \"properties\": {\"correlation_id\": \"b69ad207-308f-4c73-988a-8321a640dbdc\", \"reply_to\": \"e3c01706-26cb-3330-98aa-b5551bd6e56b\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"0c5d61f8-ae4a-492e-904f-cbd4b551c3ca\"}}"

JSON。ちょっと見にくいので整形するとこんな感じ。

{
    "body": "W1siNDEiXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
    "content-encoding": "utf-8",
    "content-type": "application/json",
    "headers": {
        "lang": "py",
        "task": "tasks.hello",
        "id": "b69ad207-308f-4c73-988a-8321a640dbdc",
        "eta": null,
        "expires": null,
        "group": null,
        "retries": 0,
        "timelimit": [
            null,
            null
        ],
        "root_id": "b69ad207-308f-4c73-988a-8321a640dbdc",
        "parent_id": null,
        "argsrepr": "('41',)",
        "kwargsrepr": "{}",
        "origin": "gen14561@ariarijp.local"
    },
    "properties": {
        "correlation_id": "b69ad207-308f-4c73-988a-8321a640dbdc",
        "reply_to": "e3c01706-26cb-3330-98aa-b5551bd6e56b",
        "delivery_mode": 2,
        "delivery_info": {
            "exchange": "",
            "routing_key": "celery"
        },
        "priority": 0,
        "body_encoding": "base64",
        "delivery_tag": "0c5d61f8-ae4a-492e-904f-cbd4b551c3ca"
    }
}

body の中身は Base64 エンコードしてるように見えるのでデコードしてみた。

[["41"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]

celery-task-meta- * みたいなのも気になるので見てみる。

127.0.0.1:6379> type celery-task-meta-e6f5f414-1d8f-42c2-b614-75f9615ec61f
string

文字列型。JSON かな。

127.0.0.1:6379> get celery-task-meta-e6f5f414-1d8f-42c2-b614-75f9615ec61f
"{\"status\": \"SUCCESS\", \"result\": \"41\", \"traceback\": null, \"children\": [], \"task_id\": \"e6f5f414-1d8f-42c2-b614-75f9615ec61f\"}"

JSON だった。Result Backend を使用すると、きっと celery-task-meta-* に結果を書き出すのだろう。

例外を起こしたらどうなるのか?

気になったので、例外が起きるように書き換え、その結果を見てみた。

127.0.0.1:6379> get celery-task-meta-d5024461-52b3-429b-ab71-57c8e76522b3
"{\"status\": \"FAILURE\", \"result\": {\"exc_type\": \"Exception\", \"exc_message\": \"\"}, \"traceback\": \"Traceback (most recent call last):\\n  File \\\"/Users/ariarijp/.local/share/virtualenvs/celerytutorial-nzo1Q02k/lib/python3.6/site-packages/celery/app/trace.py\\\", line 375, in trace_task\\n    R = retval = fun(*args, **kwargs)\\n  File \\\"/Users/ariarijp/.local/share/virtualenvs/celerytutorial-nzo1Q02k/lib/python3.6/site-packages/celery/app/trace.py\\\", line 632, in __protected_call__\\n    return self.run(*args, **kwargs)\\n  File \\\"/Users/ariarijp/workspace/python/celerytutorial/tasks.py\\\", line 12, in hello\\n    raise Exception()\\nException\\n\", \"children\": [], \"task_id\": \"d5024461-52b3-429b-ab71-57c8e76522b3\"}"

ちょっと長いので整形。

{
    "status": "FAILURE",
    "result": {
        "exc_type": "Exception",
        "exc_message": ""
    },
    "traceback": "Traceback (most recent call last):\\n  File \\"/Users/ariarijp/.local/share/virtualenvs/celerytutorial-nzo1Q02k/lib/python3.6/site-packages/celery/app/trace.py\\", line 375, in trace_task\\n    R = retval = fun(*args, **kwargs)\\n  File \\"/Users/ariarijp/.local/share/virtualenvs/celerytutorial-nzo1Q02k/lib/python3.6/site-packages/celery/app/trace.py\\", line 632, in __protected_call__\\n    return self.run(*args, **kwargs)\\n  File \\"/Users/ariarijp/workspace/python/celerytutorial/tasks.py\\", line 12, in hello\\n    raise Exception()\\nException\\n",
    "children": [],
    "task_id": "d5024461-52b3-429b-ab71-57c8e76522b3"
}

ステータスが FAILURE になり、結果に例外の種類やトレースバックが含まれるようになった。

この値を使って結果を受け取るときに例外を発生させている様子。

ここのコードを追っていったら、Python で Promise を実現するための vine というモジュールがあることを知った。Celery プロジェクトの一部っぽい。

github.com