Python Celery
引用项目作者的话
- Celery 是“一个基于分布式消息传递的异步任务队列/作业队列。它专注于实时操作,但也支持调度。(...) 任务可以异步(在后台)或同步(等待就绪)执行。”
安装
安装 软件包 python-celery。与大多数基于 Python 的软件包一样,您将获得与 Python 3.x 兼容的软件包。
Celery 文档:“Celery 需要一个发送和接收消息的解决方案”——其中一个选项是 rabbitmq,也可以从官方仓库安装。
配置
Celery
对于配置文件,需要创建目录 /etc/celery/
,并在其中创建一个名为 app.conf
的配置文件,其中 app 是您的应用程序的名称。Celery 文档中提供了示例配置文件。
启动/启用 celery@app.service
。
要在 virtualenv 中运行 celery,请在 /etc/systemd/system
中复制 celery@.service
以进行自定义,并将 celery 二进制文件的路径更改为 virtualenv 中的副本。
RabbitMQ
RabbitMQ 将其配置存储在 /etc/rabbitmq/rabbitmq-env.conf
中
/etc/rabbitmq/rabbitmq-env.conf
NODENAME=rabbit@rakieta NODE_IP_ADDRESS=0.0.0.0 NODE_PORT=5672 LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia
您可能想要将 0.0.0.0
替换为 127.0.0.1
,RabbitMQ 不支持 Unix 套接字。
对于简单的配置,您可能还想添加 HOME=/var/lib/rabbitmq
。请阅读 RabbitMQ 文档以了解有关环境变量的更多信息
启动/启用 rabbitmq.service
。
rabbitmq-service
以 rabbitmq 用户身份启动,其主文件夹存储在 /var/lib/rabbitmq
中 - 您可能需要确保 rabbitmq 用户拥有此文件夹及其所有子文件夹请按照 RabbitMQ 文档添加您的用户和虚拟主机
$ cd /var/lib/rabbitmq $ su rabbitmq -c 'rabbitmqctl add_user myuser mypassword' $ su rabbitmq -c 'rabbitmqctl add_vhost myvhost' $ su rabbitmq -c 'rabbitmqctl set_user_tags myuser mytag' $ su rabbitmq -c 'rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"'
请阅读 RabbitMQ 管理指南以理解以上内容。
如果执行 su rabbitmq -c "rabbitmqctl status"
导致 badrpc,nodedown
,请访问 此博客文章以获取有关如何解决问题的更多信息。
su rabbitmq -c "erl"
,结果您应该得到一个没有错误的 erlang 提示符安全
您可能需要阅读相关 Celery 文档中的安全部分
示例任务
Celery 应用
请按照 Celery 文档 创建 一个 python 示例任务
test.py
from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost') @app.task def add(x, y): return x + y
amqp://myuser:mypassword@localhost:5672/myvhost
使用您在配置 RabbitMQ 时创建的相同凭据/vhost
backend='amqp'
- 此参数是可选的,因为 RabbitMQ 是 celery 使用的默认 broker。
测试运行
当在与您的 test.py
相同的目录中时,您可以运行
$ celery -A task worker --loglevel=info
然后从另一个控制台(但在同一目录中) 创建
call.py
from test import add add.delay(4, 4)
运行它
$ python call.py
首先,控制台应记录一些信息,表明 worker 已被调用
Received task: task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e] Task task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e] succeeded in 0.0007182330009527504s: 8
为 Celery 服务准备模块
以下步骤与您在 Celery 文档中找到的略有不同
要以 root 身份创建 test_task
模块,请首先创建 /lib/python3.5/site-packages/test_task
目录,一个空白的 /lib/python3.5/site-packages/test_task/__init__.py
,并将以下文件 创建 在其中
/lib/python3.5/site-packages/test_task/celery.py
from __future__ import absolute_import from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost') if __name__ == '__main__': app.start()
/lib/python3.5/site-packages/test_task/test_task.py
from __future__ import absolute_import from test_task.celery import app @app.task def add(x, y): return x + y
此时,如果您在控制台中执行 python
,您应该能够无错误地执行以下操作
>>> from test_task import celery
在 /etc/celery/celery.conf
中替换
CELERY_APP="proj"
为以下行
CELERY_APP="test_task"
重启 celery@celery.service
。
定期运行任务
任务可以通过 Celery Beat 定期运行,基本设置在相关的 Celery 文档页面中描述。一个例子
如果您想在您的 celery.py
中指定 CELERYBEAT_SCHEDULE
,那么您需要添加 app.conf
前缀,以使 celery 识别您的计划任务。之后,当您启动 celery 守护程序时,您需要添加 --beat --schedule=/var/lib/celery/celerybeat-schedule
参数。此外,/var/lib/celery
目录必须存在于 celery 相关环境中,并且归运行 celery 的用户所有。