分類
网站

使用 Django Q 方便地執行耗時任務

Django Q 是一個使用 Python 多進程製作的原生 Django 任务队列、调度器和 worker 应用。它具有很多優異特性但我只是粗淺使用了如下幾點:

  • 多進程 worker 池
  • 異步任務
  • 定時任務、cron 和重複的任務
  • 把失敗和成功結果保存到數據庫或緩存
  • 自動集成到 Django Admin,就可以在後臺添加和管理任務
  • 支持 Redis, Disque, IronMQ, SQS, MongoDB 或 ORM 這麼多種隊列代理方式,最方便的當然是 ORM
  • 注意:Django Q 的任務間隔粒度是 30 秒,如果你的任務頻率或精準度要求高於 30 秒,你需要嘗試修改源碼或使用其他任務隊列

安裝 Django Q

pip install django-q
#如果要使用 cron 規則,則也要安裝
pip install croniter
#在項目的 settings.py 文件 INSTALLED_APPS 裏加入 django_q
INSTALLED_APPS = (
    # other apps
    'django_q',
)
#設置代理方式,我選擇 ORM,
#只需把下面字段也加入 settings.py

#更多設置請參見 https://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
    'name': 'djangtasks',
    'workers': 2,
    'timeout': 180,
    'retry': 200,
    'queue_limit': 50,
    'bulk': 10,
    'orm': 'default'
}

#執行數據庫遷移來創建數據庫表
python manage.py migrate
#運行 Django Q 來處理任務隊列
python manage.py qcluster

我是使用 screen 來在後臺運行 qcluster,很方便的。

使用 Django Q 在 Django 後臺執行耗時任務

# file: views.py
import datetime
from django.http import HttpResponse
from django.utils import timezone
from django_q.tasks import async_task, schedule
from django_q.models import Schedule

def a_longtime_task(arg):
    time.sleep(30)
    return arg

def scheduled_task(arg):
    time.sleep(30)
    return arg

def a_longtime_task_request(request):
    #立即在後臺執行
    async_task(a_longtime_task,'args for the function')
    return HttpResponse('The longtime task has been started.')

def another_longtime_task_request(request):
    #三分鐘後執行
    schedule('YOURAPP.views.scheduled_task',
            'args for the function',
            schedule_type=Schedule.ONCE,
            next_run=timezone.now() + datetime.timedelta(minutes=3))
    return HttpResponse('The task has been scheduled.')

在 Django Admin 佈置定時任務

這個就比較直接,比如要每天 10 點都執行上面例子中的 scheduled_task,就點擊 Admin 頁面 Scheduled tasks 旁邊的「新增」。

Func(填入完整的函數路徑):YOURAPP.views.scheduled_task
Args:'args for the function'
Schedule Type:Daily
Next Run(設置要運行的時間如):2022-08-27 10:00

最後點擊「儲存」就可以了。

同樣的,如果選擇 Cron 類型,就需要在 Cron 輸入框填入 Cron 計劃。比如在 9 點到 23 點期間,每 5,15,25,35,45,55 各執行一次:
5,15,25,35,45,55 9-23 * * *
更多 Cron 用法和組合可以參考 crontab.guru 這個網站。

等待執行的任務、失敗的任務和成功的任務都可以方便的在Admin頁面查看和操作,非常方便。

管理隊列任務與已完成任務 Schedule Task

# file: tasks.py
from django_q.models import Schedule, Task
from django.db.models import Q
#your model to be checked
from app1.models import Race
from django.utils import timezone

import datetime
import operator
from functools import reduce

# Task to delete successful old tasks
def delete_old_tasks():
    len_task_20 = 0
    len_task_gen = 0
    now = datetime.datetime.now()
    days_passed_2 = timezone.utc.localize(now - datetime.timedelta(days=2))
    days_passed_7 = timezone.utc.localize(now - datetime.timedelta(days=7))
    task_q_list = []
    task_q_list.append(Q(group__startswith='20'))
    task_q_list.append(Q(started__lt=days_passed_7))
    task_q_list.append(Q(success__exact=True))
    task_20_queryset = Task.objects.filter(reduce(operator.and_, task_q_list))
    len_task_20 = len(task_20_queryset)
    for task in task_20_queryset:
        task.delete()

    task_q_list = []
    task_q_list.append(Q(group__exact='generate_0_task'))
    task_q_list.append(Q(started__lt=days_passed_2))
    task_q_list.append(Q(success__exact=True))
    task_gen_queryset = Task.objects.filter(reduce(operator.and_, task_q_list))
    len_task_gen = len(task_gen_queryset)
    for task in task_gen_queryset:
        task.delete()

    return f'{len_task_20} checked tasks and {len_task_gen} checking tasks have been removed.'

#generate 0 task every 5 minuts
def generate_0_task():
    res=[]
    now = datetime.datetime.now()
    dto_now = timezone.utc.localize(now)
    start_datetime_6 = now + datetime.timedelta(minutes=6)
    dto_plus6 = timezone.utc.localize(start_datetime_6)
    current_races = Race.objects.filter(post_time_live__lte=dto_plus6,
                                          post_time_live__gte=dto_now)
    if len(current_races) == 0:
        res.append('no current races')
        return res
    for race in current_races:
        if race.race_conditions.find('SIMULCAST')>-1:
            res.append(str(race)+': task passed SIMULCAST')
            continue
        tasks = Schedule.objects.filter(name=str(race))
        if len(tasks)==0:
            schedule('app1.views.zero_mtp_task',
                    race.id, race.track_id, race.race_number, race.race_date, race.post_time_live.isoformat(),
                    schedule_type=Schedule.ONCE,
                    next_run=race.post_time_live,
                    name=str(race))
            res.append(str(race)+': task added'+' post time:'+str(race.post_time_live))
        else:
            res.append(str(race)+': task already added'+' post time:'+str(race.post_time_live))
    return res

本文更新於 2023/01/27。

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *