Django Q 是一個使用 Python 多進程製作的原生 Django 任务队列、调度器和 worker 应用。它具有很多優異特性但我只是粗淺使用了如下幾點:
- 多進程 worker 池
- 異步任務
- 定時任務、cron 和重複的任務
- 把失敗和成功結果保存到數據庫或緩存
- 自動集成到 Django Admin,就可以在後臺添加和管理任務
- 支持 Redis, Disque, IronMQ, SQS, MongoDB 或 ORM 這麼多種隊列代理方式,最方便的當然是 ORM
- 注意:Django Q 的任務間隔粒度是 30 秒,如果你的任務頻率或精準度要求高於 30 秒,你需要嘗試修改源碼或使用其他任務隊列
安裝 Django Q
pip install django-q #如果要使用 cron 規則,則也要安裝 pip install croniter #在項目的 settings.py 文件 INSTALLED_APPS 裏加入 django_q INSTALLED_APPS = ( # other apps 'django_q', ) #設置代理方式,我選擇 ORM, #只需把下面字段也加入 settings.py #更多設置請參見 https://django-q.readthedocs.io/en/latest/configure.html Q_CLUSTER = { 'name': 'djangtasks', 'workers': 2, 'timeout': 180, 'retry': 200, 'queue_limit': 50, 'bulk': 10, 'orm': 'default' } #執行數據庫遷移來創建數據庫表 python manage.py migrate #運行 Django Q 來處理任務隊列 python manage.py qcluster
我是使用 screen 來在後臺運行 qcluster,很方便的。
使用 Django Q 在 Django 後臺執行耗時任務
# file: views.py import datetime from django.http import HttpResponse from django.utils import timezone from django_q.tasks import async_task, schedule from django_q.models import Schedule def a_longtime_task(arg): time.sleep(30) return arg def scheduled_task(arg): time.sleep(30) return arg def a_longtime_task_request(request): #立即在後臺執行 async_task(a_longtime_task,'args for the function') return HttpResponse('The longtime task has been started.') def another_longtime_task_request(request): #三分鐘後執行 schedule('YOURAPP.views.scheduled_task', 'args for the function', schedule_type=Schedule.ONCE, next_run=timezone.now() + datetime.timedelta(minutes=3)) return HttpResponse('The task has been scheduled.')
在 Django Admin 佈置定時任務
這個就比較直接,比如要每天 10 點都執行上面例子中的 scheduled_task,就點擊 Admin 頁面 Scheduled tasks 旁邊的「新增」。
Func(填入完整的函數路徑):YOURAPP.views.scheduled_task
Args:'args for the function'
Schedule Type:Daily
Next Run(設置要運行的時間如):2022-08-27 10:00
最後點擊「儲存」就可以了。
同樣的,如果選擇 Cron 類型,就需要在 Cron 輸入框填入 Cron 計劃。比如在 9 點到 23 點期間,每 5,15,25,35,45,55 各執行一次:
5,15,25,35,45,55 9-23 * * *
更多 Cron 用法和組合可以參考 crontab.guru 這個網站。
等待執行的任務、失敗的任務和成功的任務都可以方便的在Admin頁面查看和操作,非常方便。
管理隊列任務與已完成任務 Schedule Task
# file: tasks.py from django_q.models import Schedule, Task from django.db.models import Q #your model to be checked from app1.models import Race from django.utils import timezone import datetime import operator from functools import reduce # Task to delete successful old tasks def delete_old_tasks(): len_task_20 = 0 len_task_gen = 0 now = datetime.datetime.now() days_passed_2 = timezone.utc.localize(now - datetime.timedelta(days=2)) days_passed_7 = timezone.utc.localize(now - datetime.timedelta(days=7)) task_q_list = [] task_q_list.append(Q(group__startswith='20')) task_q_list.append(Q(started__lt=days_passed_7)) task_q_list.append(Q(success__exact=True)) task_20_queryset = Task.objects.filter(reduce(operator.and_, task_q_list)) len_task_20 = len(task_20_queryset) for task in task_20_queryset: task.delete() task_q_list = [] task_q_list.append(Q(group__exact='generate_0_task')) task_q_list.append(Q(started__lt=days_passed_2)) task_q_list.append(Q(success__exact=True)) task_gen_queryset = Task.objects.filter(reduce(operator.and_, task_q_list)) len_task_gen = len(task_gen_queryset) for task in task_gen_queryset: task.delete() return f'{len_task_20} checked tasks and {len_task_gen} checking tasks have been removed.' #generate 0 task every 5 minuts def generate_0_task(): res=[] now = datetime.datetime.now() dto_now = timezone.utc.localize(now) start_datetime_6 = now + datetime.timedelta(minutes=6) dto_plus6 = timezone.utc.localize(start_datetime_6) current_races = Race.objects.filter(post_time_live__lte=dto_plus6, post_time_live__gte=dto_now) if len(current_races) == 0: res.append('no current races') return res for race in current_races: if race.race_conditions.find('SIMULCAST')>-1: res.append(str(race)+': task passed SIMULCAST') continue tasks = Schedule.objects.filter(name=str(race)) if len(tasks)==0: schedule('app1.views.zero_mtp_task', race.id, race.track_id, race.race_number, race.race_date, race.post_time_live.isoformat(), schedule_type=Schedule.ONCE, next_run=race.post_time_live, name=str(race)) res.append(str(race)+': task added'+' post time:'+str(race.post_time_live)) else: res.append(str(race)+': task already added'+' post time:'+str(race.post_time_live)) return res
本文更新於 2023/01/27。