分類
程序

使用 Python 或/和 Cloudflare Worker 代理 NextDNS 的 DoH 服務

Previously:使用 nginx 代理 NextDNS 的 DoH 服務

使用 Cloudflare Worker 代理 NextDNS 的 DoH 服務

使用 tina-hello 的 doh-cf-workers 項目可以方便的建立一個 DoH 代理。由於只需要一個 index.js 文件,所以我這裡轉載一下。默認的示例是 Cloudflare 的 DoH,切換成 NextDNS 的話只需要修改 doh 和 dohjson 的網址,其他都不用動。

// SPDX-License-Identifier: 0BSD

const doh = 'https://dns.nextdns.io/YOUR_ID/YOUR_TAG'
const dohjson = 'https://dns.nextdns.io/YOUR_ID/YOUR_TAG'
const contype = 'application/dns-message'
const jstontype = 'application/dns-json'
const r404 = new Response(null, {status: 404});

// developers.cloudflare.com/workers/runtime-apis/fetch-event/#syntax-module-worker
export default {
    async fetch(r, env, ctx) {
        return handleRequest(r);
    },
};

async function handleRequest(request) {
    // when res is a Promise<Response>, it reduces billed wall-time
    // blog.cloudflare.com/workers-optimization-reduces-your-bill
    let res = r404;
    const { method, headers, url } = request
    const searchParams = new URL(url).searchParams
    if (method == 'GET' && searchParams.has('dns')) {
        res = fetch(doh + '?dns=' + searchParams.get('dns'), {
            method: 'GET',
            headers: {
                'Accept': contype,
            }
        });
    } else if (method === 'POST' && headers.get('content-type') === contype) {
        // streaming out the request body is optimal than awaiting on it
        const rostream = request.body;
        res = fetch(doh, {
            method: 'POST',
            headers: {
                'Accept': contype,
                'Content-Type': contype,
            },
            body: rostream,
        });
    } else if (method === 'GET' && headers.get('Accept') === jstontype) {
        const search = new URL(url).search
         res = fetch(dohjson + search, {
            method: 'GET',
            headers: {
                'Accept': jstontype,
            }
        });
    }
    return res;
}
#測試下 GET 請求
curl -H 'accept: application/dns-json' 'https://YOURPROJECT.YOURWORKER.workers.dev/dns-query?name=ft.shaman.eu.org&type=A'
#POST請求由於需要構造 DNS 參數,所以就不測了

使用 Python 代理 NextDNS 的 DoH 服務

輕量級的 API 框架我不熟悉,所以這裡還是用 Django。

#file:app1/views.py
from django.http import HttpResponse
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import requests

@csrf_exempt
def forward_to_doh(request):
    cloudflare_doh_url = "https://YOURPROJECT.YOURWORKER.workers.dev/"
    contype = 'application/dns-message'
    jstontype = 'application/dns-json'

    try:
        if request.method == 'GET':
            if 'dns' in request.GET:
                params = {'dns': request.GET['dns']}
                headers = {'Accept': contype}
                response = requests.get(cloudflare_doh_url, params=params, headers=headers)
                return JsonResponse(response.json(), status=response.status_code)
            elif request.headers.get('Accept') == jstontype:
                response = requests.get(cloudflare_doh_url + '?' + request.GET.urlencode(), headers={'Accept': jstontype})
                return JsonResponse(response.json(), status=response.status_code)
        elif request.method == 'POST':
            if request.headers.get('content-type') == contype:
                headers = {'Accept': contype, 'Content-Type': contype}
                data = request.body
                response = requests.post(cloudflare_doh_url, data=data, headers=headers)
                return HttpResponse(response.content, content_type=response.headers['Content-Type'], status=response.status_code)
        else:
            return JsonResponse({'error': 'Unsupported request method'}, status=405)
        return JsonResponse({'error': 'Not Found'}, status=404)

    except requests.exceptions.RequestException as e:
        return JsonResponse({'error': str(e)}, status=500)
#file:mysite/urls.py
from django.urls import include,path
from app1 import views

urlpatterns = [
    path('YourSecNextDns/', views.forward_to_doh, name='forward_to_doh'),
    path('i/', include('i.urls')),
]
#測試下 GET 請求
curl -H 'accept: application/dns-json' 'https://YOUR_DOMAIN.LTD/YourSecNextDns/?name=ft.shaman.eu.org&type=A'

上面的請求經過三次請求,速度(高達秒級)自然是比不了直連。但是為了體驗 DoH,還是可以用用看。反正網速本身也不快,DNS 慢一點完全可以怪到網速上。

分類
程序

在 VS Code 中調試 Django Q 任務

在 .vscode 文件夾中創建一個 lanch.json 文件, 內容如下:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Django Q",
            "type": "python",
            "request": "launch",
            "program": "${workspaceFolder}/manage.py",
            "args": [
                "qcluster"
            ],
            "django": true,
            "env": {
                "DJANGO_SETTINGS_MODULE": "YOUR_PROJECT_NAME.settings"
            },
            "gevent": true
        },
        // Your other configurations go here
    ]
}

只需修改兩個地方:program 裏的 ${workspaceFolder} 是變量,如果你項目的 manage.py 在 VS Code 項目根目錄下,那這行就不用修改。 DJANGO_SETTINGS_MODULE 這裏需要把 YOUR_PROJECT_NAME 替換成你項目的名字。然後按 Debug 按鈕就可以調試 Django Q task 了。

對於這個問題,ChatGPT 4 前前後後給出了五次錯誤配置,不得不記錄一下。

分類
网站

使用 Django Q 方便地執行耗時任務

Django Q 是一個使用 Python 多進程製作的原生 Django 任务队列、调度器和 worker 应用。它具有很多優異特性但我只是粗淺使用了如下幾點:

  • 多進程 worker 池
  • 異步任務
  • 定時任務、cron 和重複的任務
  • 把失敗和成功結果保存到數據庫或緩存
  • 自動集成到 Django Admin,就可以在後臺添加和管理任務
  • 支持 Redis, Disque, IronMQ, SQS, MongoDB 或 ORM 這麼多種隊列代理方式,最方便的當然是 ORM
  • 注意:Django Q 的任務間隔粒度是 30 秒,如果你的任務頻率或精準度要求高於 30 秒,你需要嘗試修改源碼或使用其他任務隊列

安裝 Django Q

pip install django-q
#如果要使用 cron 規則,則也要安裝
pip install croniter
#在項目的 settings.py 文件 INSTALLED_APPS 裏加入 django_q
INSTALLED_APPS = (
    # other apps
    'django_q',
)
#設置代理方式,我選擇 ORM,
#只需把下面字段也加入 settings.py

#更多設置請參見 https://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
    'name': 'djangtasks',
    'workers': 2,
    'timeout': 180,
    'retry': 200,
    'queue_limit': 50,
    'bulk': 10,
    'orm': 'default'
}

#執行數據庫遷移來創建數據庫表
python manage.py migrate
#運行 Django Q 來處理任務隊列
python manage.py qcluster

我是使用 screen 來在後臺運行 qcluster,很方便的。

使用 Django Q 在 Django 後臺執行耗時任務

# file: views.py
import datetime
from django.http import HttpResponse
from django.utils import timezone
from django_q.tasks import async_task, schedule
from django_q.models import Schedule

def a_longtime_task(arg):
    time.sleep(30)
    return arg

def scheduled_task(arg):
    time.sleep(30)
    return arg

def a_longtime_task_request(request):
    #立即在後臺執行
    async_task(a_longtime_task,'args for the function')
    return HttpResponse('The longtime task has been started.')

def another_longtime_task_request(request):
    #三分鐘後執行
    schedule('YOURAPP.views.scheduled_task',
            'args for the function',
            schedule_type=Schedule.ONCE,
            next_run=timezone.now() + datetime.timedelta(minutes=3))
    return HttpResponse('The task has been scheduled.')

在 Django Admin 佈置定時任務

這個就比較直接,比如要每天 10 點都執行上面例子中的 scheduled_task,就點擊 Admin 頁面 Scheduled tasks 旁邊的「新增」。

Func(填入完整的函數路徑):YOURAPP.views.scheduled_task
Args:'args for the function'
Schedule Type:Daily
Next Run(設置要運行的時間如):2022-08-27 10:00

最後點擊「儲存」就可以了。

同樣的,如果選擇 Cron 類型,就需要在 Cron 輸入框填入 Cron 計劃。比如在 9 點到 23 點期間,每 5,15,25,35,45,55 各執行一次:
5,15,25,35,45,55 9-23 * * *
更多 Cron 用法和組合可以參考 crontab.guru 這個網站。

等待執行的任務、失敗的任務和成功的任務都可以方便的在Admin頁面查看和操作,非常方便。

管理隊列任務與已完成任務 Schedule Task

# file: tasks.py
from django_q.models import Schedule, Task
from django.db.models import Q
#your model to be checked
from app1.models import Race
from django.utils import timezone

import datetime
import operator
from functools import reduce

# Task to delete successful old tasks
def delete_old_tasks():
    len_task_20 = 0
    len_task_gen = 0
    now = datetime.datetime.now()
    days_passed_2 = timezone.utc.localize(now - datetime.timedelta(days=2))
    days_passed_7 = timezone.utc.localize(now - datetime.timedelta(days=7))
    task_q_list = []
    task_q_list.append(Q(group__startswith='20'))
    task_q_list.append(Q(started__lt=days_passed_7))
    task_q_list.append(Q(success__exact=True))
    task_20_queryset = Task.objects.filter(reduce(operator.and_, task_q_list))
    len_task_20 = len(task_20_queryset)
    for task in task_20_queryset:
        task.delete()

    task_q_list = []
    task_q_list.append(Q(group__exact='generate_0_task'))
    task_q_list.append(Q(started__lt=days_passed_2))
    task_q_list.append(Q(success__exact=True))
    task_gen_queryset = Task.objects.filter(reduce(operator.and_, task_q_list))
    len_task_gen = len(task_gen_queryset)
    for task in task_gen_queryset:
        task.delete()

    return f'{len_task_20} checked tasks and {len_task_gen} checking tasks have been removed.'

#generate 0 task every 5 minuts
def generate_0_task():
    res=[]
    now = datetime.datetime.now()
    dto_now = timezone.utc.localize(now)
    start_datetime_6 = now + datetime.timedelta(minutes=6)
    dto_plus6 = timezone.utc.localize(start_datetime_6)
    current_races = Race.objects.filter(post_time_live__lte=dto_plus6,
                                          post_time_live__gte=dto_now)
    if len(current_races) == 0:
        res.append('no current races')
        return res
    for race in current_races:
        if race.race_conditions.find('SIMULCAST')>-1:
            res.append(str(race)+': task passed SIMULCAST')
            continue
        tasks = Schedule.objects.filter(name=str(race))
        if len(tasks)==0:
            schedule('app1.views.zero_mtp_task',
                    race.id, race.track_id, race.race_number, race.race_date, race.post_time_live.isoformat(),
                    schedule_type=Schedule.ONCE,
                    next_run=race.post_time_live,
                    name=str(race))
            res.append(str(race)+': task added'+' post time:'+str(race.post_time_live))
        else:
            res.append(str(race)+': task already added'+' post time:'+str(race.post_time_live))
    return res

本文更新於 2023/01/27。