python使用celery-group优化大数据处理速度
celery-group的使用
配置
- 为子任务配置独立的celery进程
1 | from kombu import Exchange, Queue |
使用
- 在异步任务中使用指定的进程,分割出多个子任务
1 | from celery import group |
- g.apply_async()支持操作
命令 | 说明 |
---|---|
successful() | 返回True如果全部顺利完成子任务(例如,没有提出一个例外)。 |
failed() | True如果任何子任务失败,则返回。 |
waiting() | True如果任何子任务尚未准备就绪,则返回。 |
ready() | True如果所有子任务都准备就绪,则返回。 |
completed_count() | 返回完成的子任务数。 |
revoke() | 撤消所有子任务。 |
join() | 收集所有子任务的结果,并以与调用它们相同的顺序(作为列表)返回它们。 |
需要注意的问题
异步数据量达到上限
- celery-task的result默认类型为BLOB, 而blob类型最大能容纳65KB的数据, 异步任务的数据量过大时会导致异步任务报错
1 | # 返回的数据自动被截取 |
- 所以使用celery进程的时候尽量在独立的进程中处理完数据,不要让返回值超出范围