Django Q 是一個使用 Python 多進程製作的原生 Django 任务队列、调度器和 worker 应用。它具有很多優異特性但我只是粗淺使用了如下幾點:
- 多進程 worker 池
- 異步任務
- 定時任務、cron 和重複的任務
- 把失敗和成功結果保存到數據庫或緩存
- 自動集成到 Django Admin,就可以在後臺添加和管理任務
- 支持 Redis, Disque, IronMQ, SQS, MongoDB 或 ORM 這麼多種隊列代理方式,最方便的當然是 ORM
- 注意:Django Q 的任務間隔粒度是 30 秒,如果你的任務頻率或精準度要求高於 30 秒,你需要嘗試修改源碼或使用其他任務隊列
安裝 Django Q
pip install django-q
#如果要使用 cron 規則,則也要安裝
pip install croniter
#在項目的 settings.py 文件 INSTALLED_APPS 裏加入 django_q
INSTALLED_APPS = (
# other apps
'django_q',
)
#設置代理方式,我選擇 ORM,
#只需把下面字段也加入 settings.py
#更多設置請參見 https://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
'name': 'djangtasks',
'workers': 2,
'timeout': 180,
'retry': 200,
'queue_limit': 50,
'bulk': 10,
'orm': 'default'
}
#執行數據庫遷移來創建數據庫表
python manage.py migrate
#運行 Django Q 來處理任務隊列
python manage.py qcluster
我是使用 screen 來在後臺運行 qcluster,很方便的。
使用 Django Q 在 Django 後臺執行耗時任務
# file: views.py
import datetime
from django.http import HttpResponse
from django.utils import timezone
from django_q.tasks import async_task, schedule
from django_q.models import Schedule
def a_longtime_task(arg):
time.sleep(30)
return arg
def scheduled_task(arg):
time.sleep(30)
return arg
def a_longtime_task_request(request):
#立即在後臺執行
async_task(a_longtime_task,'args for the function')
return HttpResponse('The longtime task has been started.')
def another_longtime_task_request(request):
#三分鐘後執行
schedule('YOURAPP.views.scheduled_task',
'args for the function',
schedule_type=Schedule.ONCE,
next_run=timezone.now() + datetime.timedelta(minutes=3))
return HttpResponse('The task has been scheduled.')
在 Django Admin 佈置定時任務
這個就比較直接,比如要每天 10 點都執行上面例子中的 scheduled_task,就點擊 Admin 頁面 Scheduled tasks 旁邊的「新增」。
Func(填入完整的函數路徑):YOURAPP.views.scheduled_task
Args:'args for the function'
Schedule Type:Daily
Next Run(設置要運行的時間如):2022-08-27 10:00
最後點擊「儲存」就可以了。
同樣的,如果選擇 Cron 類型,就需要在 Cron 輸入框填入 Cron 計劃。比如在 9 點到 23 點期間,每 5,15,25,35,45,55 各執行一次:
5,15,25,35,45,55 9-23 * * *
更多 Cron 用法和組合可以參考 crontab.guru 這個網站。
等待執行的任務、失敗的任務和成功的任務都可以方便的在Admin頁面查看和操作,非常方便。
管理隊列任務與已完成任務 Schedule Task
# file: tasks.py
from django_q.models import Schedule, Task
from django.db.models import Q
#your model to be checked
from app1.models import Race
from django.utils import timezone
import datetime
import operator
from functools import reduce
# Task to delete successful old tasks
def delete_old_tasks():
len_task_20 = 0
len_task_gen = 0
now = datetime.datetime.now()
days_passed_2 = timezone.utc.localize(now - datetime.timedelta(days=2))
days_passed_7 = timezone.utc.localize(now - datetime.timedelta(days=7))
task_q_list = []
task_q_list.append(Q(group__startswith='20'))
task_q_list.append(Q(started__lt=days_passed_7))
task_q_list.append(Q(success__exact=True))
task_20_queryset = Task.objects.filter(reduce(operator.and_, task_q_list))
len_task_20 = len(task_20_queryset)
for task in task_20_queryset:
task.delete()
task_q_list = []
task_q_list.append(Q(group__exact='generate_0_task'))
task_q_list.append(Q(started__lt=days_passed_2))
task_q_list.append(Q(success__exact=True))
task_gen_queryset = Task.objects.filter(reduce(operator.and_, task_q_list))
len_task_gen = len(task_gen_queryset)
for task in task_gen_queryset:
task.delete()
return f'{len_task_20} checked tasks and {len_task_gen} checking tasks have been removed.'
#generate 0 task every 5 minuts
def generate_0_task():
res=[]
now = datetime.datetime.now()
dto_now = timezone.utc.localize(now)
start_datetime_6 = now + datetime.timedelta(minutes=6)
dto_plus6 = timezone.utc.localize(start_datetime_6)
current_races = Race.objects.filter(post_time_live__lte=dto_plus6,
post_time_live__gte=dto_now)
if len(current_races) == 0:
res.append('no current races')
return res
for race in current_races:
if race.race_conditions.find('SIMULCAST')>-1:
res.append(str(race)+': task passed SIMULCAST')
continue
tasks = Schedule.objects.filter(name=str(race))
if len(tasks)==0:
schedule('app1.views.zero_mtp_task',
race.id, race.track_id, race.race_number, race.race_date, race.post_time_live.isoformat(),
schedule_type=Schedule.ONCE,
next_run=race.post_time_live,
name=str(race))
res.append(str(race)+': task added'+' post time:'+str(race.post_time_live))
else:
res.append(str(race)+': task already added'+' post time:'+str(race.post_time_live))
return res
本文更新於 2023/01/27。