博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
#SORA#celery研究笔记
阅读量:7223 次
发布时间:2019-06-29

本文共 2183 字,大约阅读时间需要 7 分钟。

hot3.png

152459_uQEq_987833.jpg

最近看到celery文档task部分,做一下小结

  1. 实际处理时,我们可以使用一个类似于logging的模块生成日志。

  2. 对于某些任务,你可以设置当满足某些条件时,重试任务、拒绝任务或忽略任务

  3. 在定义task时,@app.task(bind=True)中的bind参数可以让你在task中访问请求中的内容,比如id,group之类的信息

  4. @app.task(AGRS=VALUE),ARGS有好几个参数可以设置,比如name,有些和全局设置(CELERY_xxx_xxx之类的)是一样的配置内容

  5. 可以自定义任务状态(默认有pending,started,success,failure,retry,revoked)

  6. 当你使用pickle作为序列化工具时,你应该定义那些可以被pickle的异常(我用json,直接忽略)

实例化。你可以继承Task类定义新类,定义的__init__方法只会被调用一次,此后将持续存在。当你的task以此新类为基类,后面对此task的调用中,__init__的作用还会存在。(用途:自定义类中维持一个对数据库的连接,task可以不用每次都创建连接,而是对那个存在的属性进行操作)。

e.g:

from celery import Taskclass DatabaseTask(Task):    abstract = True    _db = None    @property    def db(self):        if self._db is None:            self._db = Database.connect()        return self._db
@app.task(base=DatabaseTask)def process_rows():    for row in process_rows.db.table.all():        …

7.定义新类时,设置为抽象类,可以作为task的基类。其中又有四种handle方法(after_return,on_failure,on_retry,on_success)

e.g:

from celery import Taskclass DebugTask(Task):    abstract = True    def after_return(self, *args, **kwargs):        print('Task returned: {0!r}'.format(self.request)@app.task(base=DebugTask)def add(x, y):    return x + y

8.最佳实践:

*忽略不需要的结果,或者设置CELERY_TASK_RESULT_EXPIRES

*若不需要,关闭rate limits

*避免写出同步的task(阻塞),同时执行的同步的task,前面的会令后面的低效,应该写成异步的方法或使用callback 

e.g:

#Bad:@app.taskdef update_page_info(url):    page = fetch_page.delay(url).get()    info = parse_page.delay(url, page).get()    store_page_info.delay(url, info)@app.taskdef fetch_page(url):    return myhttplib.get(url)@app.taskdef parse_page(url, page):    return myparser.parse_document(page)@app.taskdef store_page_info(url, info):    return PageInfo.objects.create(url, info)                    #Good:def update_page_info(url):    # fetch_page -> parse_page -> store_page    chain = fetch_page.s() | parse_page.s() | store_page_info.s(url)    chain()@app.task()def fetch_page(url):    return myhttplib.get(url)@app.task()def parse_page(page):    return myparser.parse_document(page)@app.task(ignore_result=True)def store_page_info(info, url):    PageInfo.objects.create(url=url, info=info)

*使用细粒度的任务,而不是又长又臭的long task

*尽量把需要操作的数据放在本地,或使worker靠近数据所在,降低因IO延迟的影响

暂时如此,以后有需要再补

转载于:https://my.oschina.net/hochikong/blog/399219

你可能感兴趣的文章
掘金广告产品介绍
查看>>
小猪的Python学习之旅 —— 7.Python并发之threading模块(1)
查看>>
腾讯冯宇彦:基于大数据与人工智能的智慧交通云
查看>>
eclipse再见,android studio 新手入门教程(二)项目的导入
查看>>
OpenGL ES 2 0 (iOS) 笔记大纲
查看>>
简述HTTP
查看>>
css笔记
查看>>
SpringBoot应用War包形式部署到外部Tomcat
查看>>
3款你必须知道的爬虫工具
查看>>
NS_OPTIONS枚举的用法
查看>>
智能媒体管理产品文档转换/预览功能介绍(4)--快速搭建
查看>>
5 ReCharts
查看>>
Android app 在线更新那点事儿(适配Android6.0、7.0、8.0)
查看>>
使用 OAuth 2 和 JWT 为微服务提供安全保障 - 基本概念
查看>>
PayPal 概述
查看>>
webpack 3 零基础入门教程 #18 - 构建开发和生产环境-分离配置文件
查看>>
前端技术演进:参考文章
查看>>
架构师眼里的高并发架构
查看>>
网页模板pug基本语法
查看>>
利用 AFN 上传相册或拍照图片
查看>>