跳转至内容

Python celery

来自 ArchWiki

引用 项目 作者的话

Celery 是“一个基于分布式消息传递的异步任务队列/作业队列。它专注于实时操作,但也支持调度。(...)任务可以异步(在后台)或同步(等待就绪)执行。”

安装

安装 python-celeryAUR 包。与大多数基于 python 的包一样,您可以获得一个兼容 Python 3.x 的包。

引用 Celery 文档:“Celery 需要一种发送和接收消息的解决方案”,其中一个选项是 rabbitmq,也可以从官方仓库安装。

配置

Celery

对于配置文件,需要创建目录 /etc/celery/,并在其中创建一个名为 app.conf 的配置文件,其中 app 是您的应用程序名称。一个示例配置文件可在 Celery 文档 中找到。

启动/启用 celery@app.service

要在虚拟环境中运行 celery,请将 celery@.service 复制到 /etc/systemd/system 目录以进行自定义,并更改 celery 二进制文件的路径以指向您虚拟环境中的副本。

RabbitMQ

RabbitMQ 将其配置存储在 /etc/rabbitmq/rabbitmq-env.conf 中。

/etc/rabbitmq/rabbitmq-env.conf
NODENAME=rabbit@rakieta
NODE_IP_ADDRESS=0.0.0.0
NODE_PORT=5672
    
LOG_BASE=/var/log/rabbitmq
MNESIA_BASE=/var/lib/rabbitmq/mnesia

您可能想将 0.0.0.0 替换为 127.0.0.1,RabbitMQ 不支持 Unix 套接字。

对于简单的配置,您可能还想添加 HOME=/var/lib/rabbitmq。有关环境变量的更多信息,请参阅 RabbitMQ 文档

启动/启用 rabbitmq.service

注意 rabbitmq-service 以 rabbitmq 用户身份启动,其主文件夹存储在 /var/lib/rabbitmq 中 - 您可能需要确保 rabbitmq 用户拥有该文件夹及其所有子文件夹。

请遵循 RabbitMQ 文档 添加您的用户和虚拟主机。

$ cd /var/lib/rabbitmq
[rabbitmq]$ rabbitmqctl add_user myuser mypassword
[rabbitmq]$ rabbitmqctl add_vhost myvhost
[rabbitmq]$ rabbitmqctl set_user_tags myuser mytag
[rabbitmq]$ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

阅读 RabbitMQ 管理指南 来理解上述内容。

如果执行 rabbitmqctl status 命令出现 badrpc,nodedown 错误,请访问 此博客文章 以获取更多关于如何解决此问题的信息。

注意 您可能还想运行 erl 命令,然后您应该会看到一个没有任何错误的 erlang 提示符。

安全

您可能想阅读 相关 Celery 文档 中的安全部分。

示例任务

Celery 应用

请遵循 Celery 文档 创建一个 Python 示例任务。

test.py
from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost')

@app.task
def add(x, y):
    return x + y

amqp://myuser:mypassword@localhost:5672/myvhost - 使用您在配置 RabbitMQ 时创建的相同凭据/vhost。

backend='amqp' - 此参数是可选的,因为 RabbitMQ 是 celery 默认使用的消息代理。

测试运行

在与您的 test.py 相同的目录中,您可以运行:

$ celery -A task worker --loglevel=info

然后,在另一个控制台(但位于同一目录中)创建

call.py
from test import add
    add.delay(4, 4)

运行它

$ python call.py

首先,控制台应该会记录一些信息,表明 worker 已被调用。

Received task: task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e]
Task task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e] succeeded in 0.0007182330009527504s: 8

准备 Celery 服务的模块

下面的过程与您在 Celery 文档 中找到的内容略有不同。

为了使 test_task 模块成为根模块,首先创建 /lib/python3.5/site-packages/test_task 目录,一个空的 /lib/python3.5/site-packages/test_task/__init__.py 文件,并在其中创建以下文件:

/lib/python3.5/site-packages/test_task/celery.py
from __future__ import absolute_import

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost')

if __name__ == '__main__':
    app.start()
/lib/python3.5/site-packages/test_task/test_task.py
from __future__ import absolute_import

from test_task.celery import app

@app.task
def add(x, y):
    return x + y

此时,如果您在控制台中输入 python,您应该能够执行以下操作而没有任何错误:

>>> from test_task import celery

/etc/celery/celery.conf 中,将

CELERY_APP="proj"

替换为以下行:

CELERY_APP="test_task"

重启 celery@celery.service

定期运行任务

可以通过 Celery Beat 定期运行任务,基本设置在相关的 Celery 文档页面 中进行了描述。一个示例:

如果您想在 celery.py 中指定 CELERYBEAT_SCHEDULE,则需要添加 app.conf 前缀,以便 celery 识别您的计划任务。之后,您需要在启动 celery 守护进程时添加 --beat --schedule=/var/lib/celery/celerybeat-schedule 参数。此外,/var/lib/celery 目录必须存在于 celery 相关环境中,并由运行 celery 的用户拥有。