Python Celery

来自 ArchWiki

引用项目作者的话

Celery 是“一个基于分布式消息传递的异步任务队列/作业队列。它专注于实时操作,但也支持调度。(...) 任务可以异步(在后台)或同步(等待就绪)执行。”

安装

安装 软件包 python-celery。与大多数基于 Python 的软件包一样,您将获得与 Python 3.x 兼容的软件包。

Celery 文档:“Celery 需要一个发送和接收消息的解决方案”——其中一个选项是 rabbitmq,也可以从官方仓库安装。

配置

Celery

对于配置文件,需要创建目录 /etc/celery/,并在其中创建一个名为 app.conf 的配置文件,其中 app 是您的应用程序的名称。Celery 文档中提供了示例配置文件。

启动/启用 celery@app.service

要在 virtualenv 中运行 celery,请在 /etc/systemd/system 中复制 celery@.service 以进行自定义,并将 celery 二进制文件的路径更改为 virtualenv 中的副本。

RabbitMQ

RabbitMQ 将其配置存储在 /etc/rabbitmq/rabbitmq-env.conf

/etc/rabbitmq/rabbitmq-env.conf
NODENAME=rabbit@rakieta
NODE_IP_ADDRESS=0.0.0.0
NODE_PORT=5672
    
LOG_BASE=/var/log/rabbitmq
MNESIA_BASE=/var/lib/rabbitmq/mnesia

您可能想要将 0.0.0.0 替换为 127.0.0.1,RabbitMQ 不支持 Unix 套接字。

对于简单的配置,您可能还想添加 HOME=/var/lib/rabbitmq。请阅读 RabbitMQ 文档以了解有关环境变量的更多信息

启动/启用 rabbitmq.service

注意: rabbitmq-service 以 rabbitmq 用户身份启动,其主文件夹存储在 /var/lib/rabbitmq 中 - 您可能需要确保 rabbitmq 用户拥有此文件夹及其所有子文件夹

请按照 RabbitMQ 文档添加您的用户和虚拟主机

$ cd /var/lib/rabbitmq
$ su rabbitmq -c 'rabbitmqctl add_user myuser mypassword'
$ su rabbitmq -c 'rabbitmqctl add_vhost myvhost'
$ su rabbitmq -c 'rabbitmqctl set_user_tags myuser mytag'
$ su rabbitmq -c 'rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"'

请阅读 RabbitMQ 管理指南以理解以上内容。

如果执行 su rabbitmq -c "rabbitmqctl status" 导致 badrpc,nodedown,请访问 此博客文章以获取有关如何解决问题的更多信息。

注意: 您可能还想运行 su rabbitmq -c "erl",结果您应该得到一个没有错误的 erlang 提示符

安全

您可能需要阅读相关 Celery 文档中的安全部分

示例任务

Celery 应用

请按照 Celery 文档 创建 一个 python 示例任务

test.py
from celery import Celery
    
    app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost')
    
    @app.task
    def add(x, y):
        return x + y

amqp://myuser:mypassword@localhost:5672/myvhost

使用您在配置 RabbitMQ 时创建的相同凭据/vhost

backend='amqp' - 此参数是可选的,因为 RabbitMQ 是 celery 使用的默认 broker。

测试运行

当在与您的 test.py 相同的目录中时,您可以运行

$ celery -A task worker --loglevel=info

然后从另一个控制台(但在同一目录中) 创建

call.py
from test import add
    
    add.delay(4, 4)

运行它

$ python call.py

首先,控制台应记录一些信息,表明 worker 已被调用

Received task: task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e]
Task task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e] succeeded in 0.0007182330009527504s: 8

为 Celery 服务准备模块

以下步骤与您在 Celery 文档中找到的略有不同

要以 root 身份创建 test_task 模块,请首先创建 /lib/python3.5/site-packages/test_task 目录,一个空白的 /lib/python3.5/site-packages/test_task/__init__.py,并将以下文件 创建 在其中

/lib/python3.5/site-packages/test_task/celery.py
from __future__ import absolute_import

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost')

if __name__ == '__main__':
 app.start()
/lib/python3.5/site-packages/test_task/test_task.py
from __future__ import absolute_import

from test_task.celery import app

@app.task
def add(x, y):
 return x + y

此时,如果您在控制台中执行 python,您应该能够无错误地执行以下操作

>>> from test_task import celery

/etc/celery/celery.conf 中替换

CELERY_APP="proj"

为以下行

CELERY_APP="test_task"

重启 celery@celery.service

定期运行任务

任务可以通过 Celery Beat 定期运行,基本设置在相关的 Celery 文档页面中描述。一个例子

如果您想在您的 celery.py 中指定 CELERYBEAT_SCHEDULE,那么您需要添加 app.conf 前缀,以使 celery 识别您的计划任务。之后,当您启动 celery 守护程序时,您需要添加 --beat --schedule=/var/lib/celery/celerybeat-schedule 参数。此外,/var/lib/celery 目录必须存在于 celery 相关环境中,并且归运行 celery 的用户所有。