欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

有关celery中task对象使用多线程时,动态更新问题

程序员文章站 2022-07-15 15:38:07
...

背景

有一个需求,是celery异步任务中使用多线程,同时需要对总体数量进度的更新。

描述

import celery
import threading


def run_subtask(celery_task, i):
    lock.acquire()
    #Error raises here, when update_state calls
    celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
    lock.release()


@celery.task(bind=True)
def get_info(self, user):

    for i in xrange(4):
        worker = threading.Thread(target=run_subtask, args=(self, i))
        worker.start()

调整

def run_subtask(celery_task, i, task_id):
    lock.acquire()
    #Error raises here, when update_state calls
    # celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
    # 之前的方法无法在多线程获取到指定的对象,很诡异的无法更新
    celery_task.update_state(task_id=task_id, state=states.SUCCESS, meta={'subtask_id': i})
    lock.release()

@celery.task(bind=True)
def get_info(self, user):
    task_id = self.request.id
    for i in xrange(4):
        worker = threading.Thread(target=run_subtask, args=(self, i, task_id))
        worker.start()

参考链接:https://*.com/questions/42595426/python-multithreading-into-celery-task-celery-task-update-state-error