怎么在Django中使用celery处理异步任务
怎么在Django中使用celery处理异步任务?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
什么是Celery?
Celery是 一个专注于实时处理的任务队列,它还支持任务调度。 Celery快速,简单,高度可用且灵活。
Celery需要消息传输来发送和接收消息,这可以由Redis或RabbitMQ完成。
入门
让我们开始在您的virtualenv中安装Celery软件包。
安装Celery
<spanclass="nv">$</span>pip<spanclass="nb">install</span>celery pipinstallcelery
安装Redis
我们将Message Broker用作Redis,所以我们安装
Linux / Mac用户
您可以从这里下载最新版本
$wgethttp://download.redis.io/releases/redis-4.0.8.tar.gz $tarxzfredis-4.0.8.tar.gz $cdredis-4.0.8 $make
Windows用户
对于Windows用户,您可以从此处获取redis的可执行文件。
安装后,请尝试是否正确安装。
$ redis-cli ping
它应该显示:
pong
同时安装redis的python包
$ pip install redis
Django的第一步
现在您已经成功安装了软件包,现在就开始学习Django Project
settings.py Add some of the setting configuration in your settings.py CELERY_BROKER_URL = 'redis://localhost:6379' CELERY_RESULT_BACKEND = 'redis://localhost:6379' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = "YOUR_TIMEZONE"
确保您已从YOUR_TIMEZONE更改时区。 您可以从这里获取时区
主Django项目目录中创建celery.py文件
- src/ - manage.py - celery_project/ - __init__.py - settings.py - urls.py - celery.py celery_project/celery.py
在celery.py模块中添加以下代码。 该模块用于定义celery实例。
确保已使用django项目名称更改了项目名称(<your project name>)
from__future__importabsolute_import,unicode_literals importos fromceleryimportCelery #setthedefaultDjangosettingsmoduleforthe'celery'program. os.environ.setdefault('DJANGO_SETTINGS_MODULE','<yourprojectname>.settings') app=Celery('<yourprojectname>') #Usingastringheremeanstheworkerdoesn'thavetoserialize #theconfigurationobjecttochildprocesses. #-namespace='CELERY'meansallcelery-relatedconfigurationkeys #shouldhavea`CELERY_`prefix. app.config_from_object('django.conf:settings',namespace='CELERY') #LoadtaskmodulesfromallregisteredDjangoappconfigs. app.autodiscover_tasks() @app.task(bind=True) defdebug_task(self): print('Request:{0!r}'.format(self.request)) celery_project/__init__.py
然后,我们需要将定义celery.py的应用程序导入到主项目目录的__init__.py。 这样,我们可以确保在Django项目启动时已加载应用
from__future__importabsolute_import,unicode_literals #Thiswillmakesuretheappisalwaysimportedwhen #Djangostartssothatshared_taskwillusethisapp. from.celeryimportappascelery_app __all__=['celery_app']
创建任务
现在创建一些任务
在您在INSTALLED_APPS中注册的任何应用程序中创建一个新文件
my_app/tasks.py from__future__importabsolute_import,unicode_literals fromceleryimportshared_task @shared_task(name="print_msg_with_name") defprint_message(name,*args,**kwargs): print("Celeryisworking!!{}haveimplementeditcorrectly.".format(name)) @shared_task(name="add_2_numbers") defadd(x,y): print("Addfunctionhasbeencalled!!withparams{},{}".format(x,y)) returnx+y
开始程序
打开一个NEW终端并运行以下命令以运行celery的worker实例,并将目录更改为您的主项目目录所在的位置,即,将manage.py文件放置的目录,并确保您已经 激活您的virtualenv(如果已创建)。
用您的项目名称更改项目名称
$ celery -A <your project name> worker -l info
输出:
-------------- celery@root v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-4.13.0-32-generic-x86_64-with-Ubuntu-17.10-artful 2018-02-17 08:09:37 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: celery_project:0x7f9039886400 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: redis://localhost:6379/ - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . add_2_numbers . celery_project.celery.debug_task . print_msg_with_name [2018-02-17 08:09:37,877: INFO/MainProcess] Connected to redis://localhost:6379// [2018-02-17 08:09:37,987: INFO/MainProcess] mingle: searching for neighbors [2018-02-17 08:09:39,084: INFO/MainProcess] mingle: all alone [2018-02-17 08:09:39,121: WARNING/MainProcess] /home/jai/Desktop/demo/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never ' [2018-02-17 08:09:39,121: INFO/MainProcess] celery@root ready.
注意:检查上面的[tasks],它应该包含您在task.py模块中创建的任务的名称。
有关更多信息和日志,您还可以在DEBUG MODE中运行worker实例
celery <span class="nt">-A</span> <your project name> worker <span class="nt">-l</span> info <span class="nt">--loglevel</span><span class="o">=</span>DEBUG celery -A <your project name> worker -l info --loglevel=DEBUG
注意:请勿关闭此终端,应保持打开状态!!
测试任务
现在让我们从django shell运行任务打开Django shell
$ python3 manage.py shell
用delay方法运行函数:
>>>frommy_app.tasksimportprint_message,add >>>print_message.delay("JaiSinghal") <AsyncResult:fe4f9787-9ee4-46da-856c-453d36556760> >>>add.delay(10,20) <AsyncResult:ca5d2c50-87bc-4e87-92ad-99d6d9704c30>
当检查您的celery worker实例正在运行的第二个终端时,您将获得此类型的输出,显示您的任务已收到且任务已成功完成
[2018-02-17 08:12:14,375: INFO/MainProcess] Received task: my_app.tasks.print_message[fe4f9787-9ee4-46da-856c-453d36556760] [2018-02-17 08:12:14,377: WARNING/ForkPoolWorker-4] Celery is working!! Jai Singhal have implemented it correctly. [2018-02-17 08:12:14,382: INFO/ForkPoolWorker-4] Task my_app.tasks.print_message[fe4f9787-9ee4-46da-856c-453d36556760] succeeded in 0.004476275000342866s: None [2018-02-17 08:12:28,344: INFO/MainProcess] Received task: my_app.tasks.add[ca5d2c50-87bc-4e87-92ad-99d6d9704c30] [2018-02-17 08:12:28,349: WARNING/ForkPoolWorker-3] Add function has been called!! with params 10, 20 [2018-02-17 08:12:28,358: INFO/ForkPoolWorker-3] Task my_app.tasks.add[ca5d2c50-87bc-4e87-92ad-99d6d9704c30] succeeded in 0.010077004999857309s: 30
看完上述内容,你们掌握怎么在Django中使用celery处理异步任务的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注恰卡编程网行业资讯频道,感谢各位的阅读!