分類
程序

使用 Python 或/和 Cloudflare Worker 代理 NextDNS 的 DoH 服務

Previously:使用 nginx 代理 NextDNS 的 DoH 服務

使用 Cloudflare Worker 代理 NextDNS 的 DoH 服務

使用 tina-hello 的 doh-cf-workers 項目可以方便的建立一個 DoH 代理。由於只需要一個 index.js 文件,所以我這裡轉載一下。默認的示例是 Cloudflare 的 DoH,切換成 NextDNS 的話只需要修改 doh 和 dohjson 的網址,其他都不用動。

// SPDX-License-Identifier: 0BSD

const doh = 'https://dns.nextdns.io/YOUR_ID/YOUR_TAG'
const dohjson = 'https://dns.nextdns.io/YOUR_ID/YOUR_TAG'
const contype = 'application/dns-message'
const jstontype = 'application/dns-json'
const r404 = new Response(null, {status: 404});

// developers.cloudflare.com/workers/runtime-apis/fetch-event/#syntax-module-worker
export default {
    async fetch(r, env, ctx) {
        return handleRequest(r);
    },
};

async function handleRequest(request) {
    // when res is a Promise<Response>, it reduces billed wall-time
    // blog.cloudflare.com/workers-optimization-reduces-your-bill
    let res = r404;
    const { method, headers, url } = request
    const searchParams = new URL(url).searchParams
    if (method == 'GET' && searchParams.has('dns')) {
        res = fetch(doh + '?dns=' + searchParams.get('dns'), {
            method: 'GET',
            headers: {
                'Accept': contype,
            }
        });
    } else if (method === 'POST' && headers.get('content-type') === contype) {
        // streaming out the request body is optimal than awaiting on it
        const rostream = request.body;
        res = fetch(doh, {
            method: 'POST',
            headers: {
                'Accept': contype,
                'Content-Type': contype,
            },
            body: rostream,
        });
    } else if (method === 'GET' && headers.get('Accept') === jstontype) {
        const search = new URL(url).search
         res = fetch(dohjson + search, {
            method: 'GET',
            headers: {
                'Accept': jstontype,
            }
        });
    }
    return res;
}
#測試下 GET 請求
curl -H 'accept: application/dns-json' 'https://YOURPROJECT.YOURWORKER.workers.dev/dns-query?name=ft.shaman.eu.org&type=A'
#POST請求由於需要構造 DNS 參數,所以就不測了

使用 Python 代理 NextDNS 的 DoH 服務

輕量級的 API 框架我不熟悉,所以這裡還是用 Django。

#file:app1/views.py
from django.http import HttpResponse
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import requests

@csrf_exempt
def forward_to_doh(request):
    cloudflare_doh_url = "https://YOURPROJECT.YOURWORKER.workers.dev/"
    contype = 'application/dns-message'
    jstontype = 'application/dns-json'

    try:
        if request.method == 'GET':
            if 'dns' in request.GET:
                params = {'dns': request.GET['dns']}
                headers = {'Accept': contype}
                response = requests.get(cloudflare_doh_url, params=params, headers=headers)
                return JsonResponse(response.json(), status=response.status_code)
            elif request.headers.get('Accept') == jstontype:
                response = requests.get(cloudflare_doh_url + '?' + request.GET.urlencode(), headers={'Accept': jstontype})
                return JsonResponse(response.json(), status=response.status_code)
        elif request.method == 'POST':
            if request.headers.get('content-type') == contype:
                headers = {'Accept': contype, 'Content-Type': contype}
                data = request.body
                response = requests.post(cloudflare_doh_url, data=data, headers=headers)
                return HttpResponse(response.content, content_type=response.headers['Content-Type'], status=response.status_code)
        else:
            return JsonResponse({'error': 'Unsupported request method'}, status=405)
        return JsonResponse({'error': 'Not Found'}, status=404)

    except requests.exceptions.RequestException as e:
        return JsonResponse({'error': str(e)}, status=500)
#file:mysite/urls.py
from django.urls import include,path
from app1 import views

urlpatterns = [
    path('YourSecNextDns/', views.forward_to_doh, name='forward_to_doh'),
    path('i/', include('i.urls')),
]
#測試下 GET 請求
curl -H 'accept: application/dns-json' 'https://YOUR_DOMAIN.LTD/YourSecNextDns/?name=ft.shaman.eu.org&type=A'

上面的請求經過三次請求,速度(高達秒級)自然是比不了直連。但是為了體驗 DoH,還是可以用用看。反正網速本身也不快,DNS 慢一點完全可以怪到網速上。

分類
程序

在 CentOS 8 上使用 Xvfb 和 PyAutoGUI

本文介紹在 CentOS 8 上使用 PyAutoGUI 操作運行與 Xvfb 桌面環境中的 Firefox 所需的準備。由於所操作網頁需要登錄並有 OTP,所以還需要使用 VNC 來手動輸入密碼以做準備工作。

安裝 Xvfb

sudo yum install xorg-x11-server-Xvfb
#啓動 Xvfb
Xvfb :0 -screen 0 1366x768x24+32 -br +bs -ac &
#修改環境變量 DISPLAY
export DISPLAY=:0
#查看環境變量 DISPLAY
$DISPLAY
#查看 Xvfb 進程
ps -ef | grep Xvfb

如果只是爲了使用 webdriver 操作瀏覽器那麼這樣安裝運行 Xvfb 後就可以了,但是我們要使用 PyAutoGUI,所以還有依賴要裝。

安裝火狐瀏覽器

我個人比較偏好 ESR 版火狐,下載後解壓就可以使用了。

tar -xf firefox-115.5.0esr.tar.bz2
cd Firefox
./firefox
#如果要指定窗口大小可以
./firefox -width 1350 -height 764

安裝 x11vnc 服務

其實只要下載執行檔,運行即可,非常簡單。不過 x11vnc 12 年未有更新,不知道還能用多久。如果那天不能用了可以嘗試 Tiger VNC。

mv x11vnc-0.9.13_amd64-Linux x11vnc
chmod +x x11vnc
#運行 vnc server,密碼設置長一點,端口是 9999
./x11vnc -display :0 -ncache 0 -passwd piHrcHxmJauvIOftenUseA64digitPasswordpokiHJHQWdsgcGFTG -rfbport 9999
#上面命令在一次連接後會停止,如果要服務一直可以用,就加上 forever 參數
./x11vnc -display :0 -ncache 0 -passwd piHrcHxmJauvIOftenUseA64digitPasswordpokiHJHQWdsgcGFTG -rfbport 9999 -shared -forever
#注意服務器的網絡防火牆要打開 9999 端口
#用 nmap 查看端口是否打開
nmap -p 9999 1.1.1.1

Fedora 本地可以安裝 Remmina 作爲 VNC 客戶端。

sudo dnf install remmina -y
#如果想要使用 socks5 代理
proxychains4 /usr/bin/remmina

打開 Remmina 後在地址欄的協議選項裏選擇 VNC,然後地址填 1.1.1.1:9999 回車就可以連上服務器的桌面並看到剛剛打開的火狐瀏覽器進行設置了。點左上角的 + 號可以添加 profile,這樣下次雙擊就能連上 VNC 了。

安裝與配置 PyAutoGUI

我自己從源碼編譯的 Python 一直提示沒有 _tkinter 模塊。搜了半天沒有解決,於是使用系統自帶的 Python 3.9 得以解決。雖然我是用 Python 3.11 開發的,但是 Python 3.9 跑起來也沒問題。

import _tkinter # If this fails your Python may not be configured for Tk
ImportError: No module named _tkinter
#安裝 Python 3.9 以及依賴
sudo yum install libnsl ImageMagick xclip
sudo yum install python39 python39-tkinter

#xdotool 並不支持 Xvfb,所以不用裝
#sudo yum install xdotool

#創建虛擬環境
cd your_project
python3.9 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

示例代碼

import json
import random
import sys
import requests
import time
import logging
import os
import pyperclip
import subprocess
from urllib.parse import urlparse, parse_qs, urlunparse
from dotenv import load_dotenv
import psutil
import pyautogui
import pyscreeze

script_dir = os.path.dirname(os.path.realpath(__file__))

load_dotenv()
api_key = os.getenv("API_KEY")
proxy = os.getenv("PROXY")
de = os.getenv("DE")
batch = int(os.getenv("BATCH"))

if proxy is None:
    proxies = None
else:
    proxies = {"http": proxy, "https": proxy}

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
# Create a file handler to save logs to a file
log_file = os.path.join(script_dir, "logfile.log")
file_handler = logging.FileHandler(log_file)

# Set the logging level for the file handler (if different from the root logger)
file_handler.setLevel(logging.INFO)  # Adjust the log level if needed

# Create a formatter and add it to the file handler
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)

# Add the file handler to the logger
logger.addHandler(file_handler)


def activate_window(window_title):
    if de == "Xvfb":
        firefox_exists = False
        for proc in psutil.process_iter():
            try:
                if "firefox" in proc.name().lower():
                    firefox_exists = True
                    break
            except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                pass
        if firefox_exists is False:
            subprocess.Popen(
                [
                    "/home/centos/fred/programs/firefox/firefox",
                    "-width",
                    "1350",
                    "-height",
                    "764",
                ]
            )
            logger.info("firefox started")
            time.sleep(10)
    elif de == "Xfce":
        command = f"xdotool search --onlyvisible --name '{window_title}' windowactivate"
        subprocess.run(command, shell=True)


def wait_for_image(image_path, timeout=10):
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            location = pyautogui.locateOnScreen(
                image_path, region=(0, 0, 1360, 760), confidence=0.9, grayscale=True
            )
            if location:
                return location
        except pyautogui.ImageNotFoundException:
            pass
        time.sleep(1.5)
    return None


def wait_for_images(image_path, timeout=10):
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            location = pyautogui.locateAllOnScreen(
                image_path, region=(0, 0, 1366, 760), confidence=0.9
            )
            if location:
                return list(location)
        except pyautogui.ImageNotFoundException:
            pass
        except pyscreeze.ImageNotFoundException:
            pass
        time.sleep(1.5)
    return []

def right_click():
    pyautogui.mouseDown(button="right")  # Perform a right-click
    time.sleep(0.1)  # Adjust the delay if needed (time in seconds)
    pyautogui.mouseUp(button="right")  # Release the right-click


def triple_click(x, y):
    for _ in range(3):
        pyautogui.click(x, y)
        time.sleep(0.1)

def get_data(obj):
    start_time = time.time()
    time.sleep(0.5)
    activate_window("Mozilla Firefox")
    time.sleep(0.5)
    # load webpage
    url_profile = obj["contact_name_href"]
    if url_profile.find("?") != -1:
        url_profile = url_profile[: url_profile.find("?")]
    logger.info(url_profile)
    # open a new tab
    # pyautogui.hotkey('ctrl', 't')
    pyautogui.hotkey("ctrl", "l")
    time.sleep(0.5)
    pyautogui.typewrite(url_profile)
    time.sleep(0.5)
    pyautogui.press("enter")
    time.sleep(8)

    # check page loaded
    image_path = os.path.join(script_dir, "img/ContactDetails.png")
    loaded = wait_for_image(image_path, 40)

    # Box(left=535, top=541, width=74, height=19)
    if loaded:
        logger.info("Page loaded")

    # LinkedIn
    image_path = os.path.join(script_dir, "img/LinkedIn.png")
    l = wait_for_images(image_path, 3)
    l_index = 0
    retry = 0
    temp_l = None
    while l_index < len(l):
        logger.info(f"l_index:{l_index+1}/{len(l)}")
        if temp_l is not None:
            if temp_l[0] == l[l_index][0]:
                l_index = l_index + 1
                continue
        li = l[l_index]
        pyautogui.moveTo(li, duration=2, tween=pyautogui.easeInOutQuad)
        time.sleep(0.5)
        pyautogui.click(li)
        time.sleep(2)
        pyautogui.hotkey("ctrl", "l")
        time.sleep(0.5)
        pyautogui.hotkey("ctrl", "c")
        time.sleep(0.5)
        pyautogui.hotkey("ctrl", "w")
        time.sleep(0.5)
        copied_text = pyperclip.paste()
        url = copied_text.strip()
        url = get_link_from_redirect(url)

        if url is not None and url != "":
            # logger.info(len(url))
            if url.find("linkedin") == -1:
                logger.info("not found 'linkedin'")
                if retry < 3:
                    retry = retry + 1
                else:
                    l_index = l_index + 1
                continue

            # personal LindedIn
            if li[0] < loaded_x_left:
                obj["LinkedIn_Personal_URL"] = url
            else:
                obj["LinkedIn_URL"] = url
        temp_l = li
        l_index = l_index + 1

    # Supplemental_Email
    image_path = os.path.join(script_dir, "img/Supplemental.png")
    l = wait_for_image(image_path, 3)
    if l:
        pyautogui.moveTo(
            l[0] + l[2] + 50 + random.randint(-5, 5),
            l[1] + l[3] / 2,
            duration=1,
            tween=pyautogui.easeInOutQuad,
        )
        time.sleep(1)
        right_click()
        time.sleep(1)
        pyautogui.hotkey("l")
        time.sleep(1)
        pyautogui.hotkey("esc")
        # [email protected]
        copied_text = pyperclip.paste()
        obj["Supplemental_Email"] = copied_text.strip()

    # Local address
    image_path = os.path.join(script_dir, "img/Local.png")
    l = wait_for_image(image_path, 3)
    if l:
        x = l[0] + l[2] + 30 + random.randint(-5, 5)
        pyautogui.moveTo(x, l[1] + l[3] / 2, duration=1, tween=pyautogui.easeInOutQuad)
        time.sleep(1)
        triple_click(x, l[1] + l[3] / 2)
        time.sleep(1)
        pyautogui.hotkey("ctrl", "c")
        time.sleep(1)
        # [email protected]
        copied_text = pyperclip.paste()
        obj["Local_Location_Address"] = copied_text.strip()

def main(max_id):
    failed_list = []
    for i in range(batch):
        logger.info(f"batch:{i+1}/{batch}")
        obj = get_job()
        if obj is None:
            continue
        if obj["contact_name_href"] == "" or obj["contact_name_href"] is None:
            res = post_job(obj)
            logger.info(res)
            continue

        if obj["id"] > max_id:
            logger.info("max_id reached")
            continue
        obj = get_data(obj)
        if obj["message"] != "success":
            if obj["id"] in failed_list:
                res = post_job(obj)
                logger.info(res)
            else:
                failed_list.append(obj["id"])
            continue

        res = post_job(obj)
        logger.info(res)

if __name__ == "__main__":
    # time.sleep(10)
    # countdown
    for i in range(6):
        logger.info(f"countdown:{6-i}")
        time.sleep(1)

    # get max id from command
    if len(sys.argv) > 1:
        max_id = int(sys.argv[1])
        logger.info(f"max_id:{max_id}")
    else:
        max_id = 9999999

    main(max_id)
分類
程序

在 VS Code 中調試 Django Q 任務

在 .vscode 文件夾中創建一個 lanch.json 文件, 內容如下:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Django Q",
            "type": "python",
            "request": "launch",
            "program": "${workspaceFolder}/manage.py",
            "args": [
                "qcluster"
            ],
            "django": true,
            "env": {
                "DJANGO_SETTINGS_MODULE": "YOUR_PROJECT_NAME.settings"
            },
            "gevent": true
        },
        // Your other configurations go here
    ]
}

只需修改兩個地方:program 裏的 ${workspaceFolder} 是變量,如果你項目的 manage.py 在 VS Code 項目根目錄下,那這行就不用修改。 DJANGO_SETTINGS_MODULE 這裏需要把 YOUR_PROJECT_NAME 替換成你項目的名字。然後按 Debug 按鈕就可以調試 Django Q task 了。

對於這個問題,ChatGPT 4 前前後後給出了五次錯誤配置,不得不記錄一下。

分類
网站

使用 Django Q 方便地執行耗時任務

Django Q 是一個使用 Python 多進程製作的原生 Django 任务队列、调度器和 worker 应用。它具有很多優異特性但我只是粗淺使用了如下幾點:

  • 多進程 worker 池
  • 異步任務
  • 定時任務、cron 和重複的任務
  • 把失敗和成功結果保存到數據庫或緩存
  • 自動集成到 Django Admin,就可以在後臺添加和管理任務
  • 支持 Redis, Disque, IronMQ, SQS, MongoDB 或 ORM 這麼多種隊列代理方式,最方便的當然是 ORM
  • 注意:Django Q 的任務間隔粒度是 30 秒,如果你的任務頻率或精準度要求高於 30 秒,你需要嘗試修改源碼或使用其他任務隊列

安裝 Django Q

pip install django-q
#如果要使用 cron 規則,則也要安裝
pip install croniter
#在項目的 settings.py 文件 INSTALLED_APPS 裏加入 django_q
INSTALLED_APPS = (
    # other apps
    'django_q',
)
#設置代理方式,我選擇 ORM,
#只需把下面字段也加入 settings.py

#更多設置請參見 https://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
    'name': 'djangtasks',
    'workers': 2,
    'timeout': 180,
    'retry': 200,
    'queue_limit': 50,
    'bulk': 10,
    'orm': 'default'
}

#執行數據庫遷移來創建數據庫表
python manage.py migrate
#運行 Django Q 來處理任務隊列
python manage.py qcluster

我是使用 screen 來在後臺運行 qcluster,很方便的。

使用 Django Q 在 Django 後臺執行耗時任務

# file: views.py
import datetime
from django.http import HttpResponse
from django.utils import timezone
from django_q.tasks import async_task, schedule
from django_q.models import Schedule

def a_longtime_task(arg):
    time.sleep(30)
    return arg

def scheduled_task(arg):
    time.sleep(30)
    return arg

def a_longtime_task_request(request):
    #立即在後臺執行
    async_task(a_longtime_task,'args for the function')
    return HttpResponse('The longtime task has been started.')

def another_longtime_task_request(request):
    #三分鐘後執行
    schedule('YOURAPP.views.scheduled_task',
            'args for the function',
            schedule_type=Schedule.ONCE,
            next_run=timezone.now() + datetime.timedelta(minutes=3))
    return HttpResponse('The task has been scheduled.')

在 Django Admin 佈置定時任務

這個就比較直接,比如要每天 10 點都執行上面例子中的 scheduled_task,就點擊 Admin 頁面 Scheduled tasks 旁邊的「新增」。

Func(填入完整的函數路徑):YOURAPP.views.scheduled_task
Args:'args for the function'
Schedule Type:Daily
Next Run(設置要運行的時間如):2022-08-27 10:00

最後點擊「儲存」就可以了。

同樣的,如果選擇 Cron 類型,就需要在 Cron 輸入框填入 Cron 計劃。比如在 9 點到 23 點期間,每 5,15,25,35,45,55 各執行一次:
5,15,25,35,45,55 9-23 * * *
更多 Cron 用法和組合可以參考 crontab.guru 這個網站。

等待執行的任務、失敗的任務和成功的任務都可以方便的在Admin頁面查看和操作,非常方便。

管理隊列任務與已完成任務 Schedule Task

# file: tasks.py
from django_q.models import Schedule, Task
from django.db.models import Q
#your model to be checked
from app1.models import Race
from django.utils import timezone

import datetime
import operator
from functools import reduce

# Task to delete successful old tasks
def delete_old_tasks():
    len_task_20 = 0
    len_task_gen = 0
    now = datetime.datetime.now()
    days_passed_2 = timezone.utc.localize(now - datetime.timedelta(days=2))
    days_passed_7 = timezone.utc.localize(now - datetime.timedelta(days=7))
    task_q_list = []
    task_q_list.append(Q(group__startswith='20'))
    task_q_list.append(Q(started__lt=days_passed_7))
    task_q_list.append(Q(success__exact=True))
    task_20_queryset = Task.objects.filter(reduce(operator.and_, task_q_list))
    len_task_20 = len(task_20_queryset)
    for task in task_20_queryset:
        task.delete()

    task_q_list = []
    task_q_list.append(Q(group__exact='generate_0_task'))
    task_q_list.append(Q(started__lt=days_passed_2))
    task_q_list.append(Q(success__exact=True))
    task_gen_queryset = Task.objects.filter(reduce(operator.and_, task_q_list))
    len_task_gen = len(task_gen_queryset)
    for task in task_gen_queryset:
        task.delete()

    return f'{len_task_20} checked tasks and {len_task_gen} checking tasks have been removed.'

#generate 0 task every 5 minuts
def generate_0_task():
    res=[]
    now = datetime.datetime.now()
    dto_now = timezone.utc.localize(now)
    start_datetime_6 = now + datetime.timedelta(minutes=6)
    dto_plus6 = timezone.utc.localize(start_datetime_6)
    current_races = Race.objects.filter(post_time_live__lte=dto_plus6,
                                          post_time_live__gte=dto_now)
    if len(current_races) == 0:
        res.append('no current races')
        return res
    for race in current_races:
        if race.race_conditions.find('SIMULCAST')>-1:
            res.append(str(race)+': task passed SIMULCAST')
            continue
        tasks = Schedule.objects.filter(name=str(race))
        if len(tasks)==0:
            schedule('app1.views.zero_mtp_task',
                    race.id, race.track_id, race.race_number, race.race_date, race.post_time_live.isoformat(),
                    schedule_type=Schedule.ONCE,
                    next_run=race.post_time_live,
                    name=str(race))
            res.append(str(race)+': task added'+' post time:'+str(race.post_time_live))
        else:
            res.append(str(race)+': task already added'+' post time:'+str(race.post_time_live))
    return res

本文更新於 2023/01/27。

分類
网站

Django with PostgreSQL

參考 PostgreSQL:Linux downloads (Red Hat family) 來安裝PostgreSQL,比如 Fedora 35 可以這樣:

sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/F-35-x86_64/pgdg-fedora-repo-latest.noarch.rpm
sudo dnf install -y postgresql13-server libpq-devel
sudo /usr/pgsql-13/bin/postgresql-13-setup initdb
sudo systemctl enable postgresql-13
sudo systemctl start postgresql-13

安裝完成後來初始化一個數據庫和用戶。

#使用 root 登錄 PostgreSQL
sudo -u postgres psql
#創建數據庫
CREATE DATABASE project;
#創建新用戶
CREATE USER user42 WITH PASSWORD 'password42';
#下面這三行優化是 Django 推薦的
ALTER ROLE user42 SET client_encoding TO 'utf8';
ALTER ROLE user42 SET default_transaction_isolation TO 'read committed';
ALTER ROLE user42 SET timezone TO 'Asia/Taipei';
#把數據庫授權給新用戶
GRANT ALL PRIVILEGES ON DATABASE project TO user42;
#退出數據庫
\q

要在 Django 中使用 PostgreSQL 數據庫,需要還安裝 psycopg2:

pip install Django psycopg2
#如果像我一樣在 Fedora 安裝失敗了
#可以嘗試安裝編譯好的版本
pip install Django psycopg2-binary

之後在項目的設置中把默認的 SQLite 數據庫資料改成 PostgreSQL 就可以了。更像詳細的步驟與解釋,可以參考:How To Use PostgreSQL with your Django Application on Ubuntu 20.04

刪除 Django 數據庫中的表格並重建

這似乎不是正確的回滾數據庫的操作辦法,但是也可以一試。

#從 migrations 目錄找到要刪除的數據庫變更文件,刪除掉
#進入數據庫操作程序
python manage.py dbshell
#查看並找到要刪除的表名
SELECT * FROM pg_catalog.pg_tables;
#如果是MariaDB,這樣查看表名
show tables;
#比如表名爲 task_day,則這樣刪除
DROP TABLE task_day;
#退出數據庫操作程序
exit;
#刪掉數據庫裏的 migrations
python manage.py migrate --prune task

#現在可以重新建立表格了
python manage.py makemigrations
python manage.py migrate

本文更新於 2022/10/24。

分類
Linux 程序

Django簡易搭建上傳文件

這裡使用Django+Gunicorn+Nginx的方式簡單運行一個小型webserver,實現一個簡單的上傳文件到服務器的功能(並不生成下載鏈接)。

啟動虛擬環境,安裝django和gunicorn:

pip install Django==2.0
pip install gunicorn
#進入要放置代碼的目錄並新建項目
django-admin startproject mysite
##或者在當前目錄建立項目
#django-admin startproject mysite .
cd mysite
#新建app
python manage.py startapp polls

先建立一個表格:

#polls/forms.py
from django import forms

class UploadFileForm(forms.Form):
    title = forms.CharField(label='密碼',max_length=20,widget=forms.PasswordInput)
    file = forms.FileField(label='文件',)

修改view:

#polls/views.py
import os
import subprocess
from django.core.files.storage import FileSystemStorage
from django.conf import settings
from django.shortcuts import render
from django.http import HttpResponse
from .forms import UploadFileForm

#handle file example with file
def handle_uploaded_file(f):
    with open('/file/should/be/saved/here/target.odt', 'wb+') as destination:
        for chunk in f.chunks():
            destination.write(chunk)

#another handle file example with filename
def handle_uploaded_file2(filename):
    msg=''
    try:
        targetZipFilePath = os.path.join(settings.BASE_DIR, filename)
        cmd1=subprocess.check_call(["unzip", "-o", targetZipFilePath, "-d", "/home/fred/workspace/"])
        if cmd1==0 :
            cmd2=subprocess.check_call(["cp", "-Rf", "/home/fred/workspace/dist", "/home/fred/"])
            if cmd2==0 :
                msg="deployed successfully"
            else:
                msg="error 2"
        else:
            msg="error 1"
    except:
        msg = 'error 0'
    return msg

def upload_file(request):
    if request.method == 'POST':
        form = UploadFileForm(request.POST, request.FILES)
        if form.is_valid() and request.POST['title']=='Secret':
#            handle_uploaded_file(request.FILES['file'])
#            return HttpResponse("上傳成功")
            myfile = request.FILES['file']
            fs = FileSystemStorage()
            filename = fs.save(myfile.name, myfile)
            uploaded_file_url = fs.url(filename)
            print(uploaded_file_url)
            res = handle_uploaded_file2(uploaded_file_url)
            return HttpResponse(res)
    else:
        form = UploadFileForm()
    return render(request, 'upload.html', {'form': form})

新建一個表格的模板:

#polls/templates/upload.html
<form enctype="multipart/form-data" action="/polls/upload/" method="post">
    {% csrf_token %}
    {{ form }}
    <input type="submit" value="上傳" />
</form>

新建一個url路由表:

#polls/urls.py
from django.urls import path

from . import views

urlpatterns = [
    path('upload/', views.upload_file, name='upload_file'),
]

修改項目路由:

#mysite/urls.py
from django.contrib import admin
from django.urls import include,path

urlpatterns = [
    path('polls/', include('polls.urls')),
    path('admin/', admin.site.urls),
]

修改項目設置:

#mysite/settings.py
INSTALLED_APPS = [    
    'polls.apps.PollsConfig',
#    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
#    'django.contrib.messages',
#    'django.contrib.staticfiles',
]
LANGUAGE_CODE = 'zh-Hant'
TIME_ZONE = 'Asia/Taipei'

然後在項目目錄(最上層)運行gunicorn就可以訪問了:

gunicorn mysite.wsgi --bind 127.0.0.1:3040

nginx中增加如下server即可在外網訪問了(鏈接應該是http://YourPublicIP:8081/polls/upload/):

server {
        listen 8081;
        server_name 127.0.0.1;
        charset utf-8;
        keepalive_timeout 60s;
        #access_log logs/django2a.access.log combined if=$loggable;
        
        location / {
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_redirect off;
    
            if (!-f $request_filename) {
                proxy_pass http://127.0.0.1:3040;
                break;
            }
    }
}

CentOS6+Django2+MySql

yum install MySQL-python
pip install mysqlclient
#如果import MySQLdb提示無libmysqlclient.so.18
#則建立軟連接如下
ln -s /usr/local/mysql/lib/libmysqlclient.so.18 /usr/lib64/libmysqlclient.so.18
分類
方法

通過 ICS 文件給日曆增加農曆

很長一段時間以來,我一直受沒有農曆可用的困擾。安卓原生的日曆不支持農曆,谷歌日曆也是到了安卓 8 才開始有農曆,等我用到安卓 8 的時候我已經開始去谷歌化了。轉用開源日曆應用(Etar)後也沒有發現很好的辦法來解決農曆的問題,主要是曆法這東西真的很複雜。終於在最近找到了一個方案,通過 ICS 文件把計算好的農曆導入日曆就可以了。我這裏提供了從 2017 年 01 月 01 日到 2049 年 12 月 31 日的農曆和節氣與公曆對應關係的 ICS 文件,只要選擇需要的年份導入日曆即可。 這些 ICS 文件是基於 infinet 開發的 lunar-calendar 項目生成的,其原始日期數據來自於香港天文台的公曆與農曆日期對照表。我在使用的時候將節氣和生肖做了繁體中文的轉換,並且刪減了一些不展示在日曆中的文本以減小文件大小。我修改後的腳本也可以在上面的分享文件夾中找到。

順便說下我這邊具體是怎麼導入的:我一直在用 Disroot 提供的 Nextcloud 實例,其中有日曆的功能,而我之前就已經在安卓上通過 DAVx⁵ 來同步日曆和聯繫人了。所以我是在電腦上通過瀏覽器來操作的,理論上在手機上也是類似的。日曆網頁的左側點「新日曆」選擇「新日曆」輸入名稱就創建好了新日曆了,如「農曆」。然後點擊左下角的「設定及匯入」-「匯入日曆」,選擇 ICS 文件然後選擇要匯入的日曆即「農曆」。最後耐心等待一番,因爲我通過網絡請求看到農曆竟然是一條一天這樣傳的,我的文件都是三年三年的,所以要耐心等它傳完,傳完後點擊一下「農曆」左側的開關就可以看到農曆了。手機上同步前先從菜單中「刷新日曆列表」就會出現「農曆」,再來同步就可以在日曆裏看到農曆和節氣了。

當然你也可以通過 Calendar Import-Export 將 ICS 文件導入本地日曆。

本文更新於 2023/03/06。