celery-group的使用

配置

  • 为子任务配置独立的celery进程
1
2
3
4
5
6
7
8
9
10
11
from kombu import Exchange, Queue
default_exchange = Exchange('default', type='direct')
concurrency_exchange = Exchange('concurrency', type='direct')

app.conf.task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('concurrency', concurrency_exchange, routing_key='concurrency'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'

使用

  • 在异步任务中使用指定的进程,分割出多个子任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from celery import group
from celery.result import allow_join_result
a = [1, 2, 3, 4]

@app.task(bind=True, queue='concurrency')
def add(a):
if len(a) > 1:
g1 = add2.s(a[0:int(len(a) / 2)])
g2 = add2.s(a[int(len(a) / 2):])
g = group([g1, g2])
res = g.apply_async()
if result.ready(): # 是否执行完毕
if result.successful(): # 是否所有子任务都成功了
with allow_join_result():
r = res.get() # 获取所有值, 返回值为一个列表 [[1, 2], [3, 4]]


@app.task(bind=True, queue='concurrency')
def add2(a):
return a
  • g.apply_async()支持操作
命令 说明
successful() 返回True如果全部顺利完成子任务(例如,没有提出一个例外)。
failed() True如果任何子任务失败,则返回。
waiting() True如果任何子任务尚未准备就绪,则返回。
ready() True如果所有子任务都准备就绪,则返回。
completed_count() 返回完成的子任务数。
revoke() 撤消所有子任务。
join() 收集所有子任务的结果,并以与调用它们相同的顺序(作为列表)返回它们。

需要注意的问题

异步数据量达到上限

  • celery-task的result默认类型为BLOB, 而blob类型最大能容纳65KB的数据, 异步任务的数据量过大时会导致异步任务报错
1
2
3
4
# 返回的数据自动被截取
Warning: (1265, "Data truncated for column 'result' at row 1")
# 数据不完整导致解析报错
EOFError('Ran out of input',)
  • 所以使用celery进程的时候尽量在独立的进程中处理完数据,不要让返回值超出范围