分類
程序

使用 Python 或/和 Cloudflare Worker 代理 NextDNS 的 DoH 服務

Previously:使用 nginx 代理 NextDNS 的 DoH 服務

使用 Cloudflare Worker 代理 NextDNS 的 DoH 服務

使用 tina-hello 的 doh-cf-workers 項目可以方便的建立一個 DoH 代理。由於只需要一個 index.js 文件,所以我這裡轉載一下。默認的示例是 Cloudflare 的 DoH,切換成 NextDNS 的話只需要修改 doh 和 dohjson 的網址,其他都不用動。

// SPDX-License-Identifier: 0BSD

const doh = 'https://dns.nextdns.io/YOUR_ID/YOUR_TAG'
const dohjson = 'https://dns.nextdns.io/YOUR_ID/YOUR_TAG'
const contype = 'application/dns-message'
const jstontype = 'application/dns-json'
const r404 = new Response(null, {status: 404});

// developers.cloudflare.com/workers/runtime-apis/fetch-event/#syntax-module-worker
export default {
    async fetch(r, env, ctx) {
        return handleRequest(r);
    },
};

async function handleRequest(request) {
    // when res is a Promise<Response>, it reduces billed wall-time
    // blog.cloudflare.com/workers-optimization-reduces-your-bill
    let res = r404;
    const { method, headers, url } = request
    const searchParams = new URL(url).searchParams
    if (method == 'GET' && searchParams.has('dns')) {
        res = fetch(doh + '?dns=' + searchParams.get('dns'), {
            method: 'GET',
            headers: {
                'Accept': contype,
            }
        });
    } else if (method === 'POST' && headers.get('content-type') === contype) {
        // streaming out the request body is optimal than awaiting on it
        const rostream = request.body;
        res = fetch(doh, {
            method: 'POST',
            headers: {
                'Accept': contype,
                'Content-Type': contype,
            },
            body: rostream,
        });
    } else if (method === 'GET' && headers.get('Accept') === jstontype) {
        const search = new URL(url).search
         res = fetch(dohjson + search, {
            method: 'GET',
            headers: {
                'Accept': jstontype,
            }
        });
    }
    return res;
}
#測試下 GET 請求
curl -H 'accept: application/dns-json' 'https://YOURPROJECT.YOURWORKER.workers.dev/dns-query?name=ft.shaman.eu.org&type=A'
#POST請求由於需要構造 DNS 參數,所以就不測了

使用 Python 代理 NextDNS 的 DoH 服務

輕量級的 API 框架我不熟悉,所以這裡還是用 Django。

#file:app1/views.py
from django.http import HttpResponse
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import requests

@csrf_exempt
def forward_to_doh(request):
    cloudflare_doh_url = "https://YOURPROJECT.YOURWORKER.workers.dev/"
    contype = 'application/dns-message'
    jstontype = 'application/dns-json'

    try:
        if request.method == 'GET':
            if 'dns' in request.GET:
                params = {'dns': request.GET['dns']}
                headers = {'Accept': contype}
                response = requests.get(cloudflare_doh_url, params=params, headers=headers)
                return JsonResponse(response.json(), status=response.status_code)
            elif request.headers.get('Accept') == jstontype:
                response = requests.get(cloudflare_doh_url + '?' + request.GET.urlencode(), headers={'Accept': jstontype})
                return JsonResponse(response.json(), status=response.status_code)
        elif request.method == 'POST':
            if request.headers.get('content-type') == contype:
                headers = {'Accept': contype, 'Content-Type': contype}
                data = request.body
                response = requests.post(cloudflare_doh_url, data=data, headers=headers)
                return HttpResponse(response.content, content_type=response.headers['Content-Type'], status=response.status_code)
        else:
            return JsonResponse({'error': 'Unsupported request method'}, status=405)
        return JsonResponse({'error': 'Not Found'}, status=404)

    except requests.exceptions.RequestException as e:
        return JsonResponse({'error': str(e)}, status=500)
#file:mysite/urls.py
from django.urls import include,path
from app1 import views

urlpatterns = [
    path('YourSecNextDns/', views.forward_to_doh, name='forward_to_doh'),
    path('i/', include('i.urls')),
]
#測試下 GET 請求
curl -H 'accept: application/dns-json' 'https://YOUR_DOMAIN.LTD/YourSecNextDns/?name=ft.shaman.eu.org&type=A'

上面的請求經過三次請求,速度(高達秒級)自然是比不了直連。但是為了體驗 DoH,還是可以用用看。反正網速本身也不快,DNS 慢一點完全可以怪到網速上。

分類
程序

在 CentOS 8 上使用 Xvfb 和 PyAutoGUI

本文介紹在 CentOS 8 上使用 PyAutoGUI 操作運行與 Xvfb 桌面環境中的 Firefox 所需的準備。由於所操作網頁需要登錄並有 OTP,所以還需要使用 VNC 來手動輸入密碼以做準備工作。

安裝 Xvfb

sudo yum install xorg-x11-server-Xvfb
#啓動 Xvfb
Xvfb :0 -screen 0 1366x768x24+32 -br +bs -ac &
#修改環境變量 DISPLAY
export DISPLAY=:0
#查看環境變量 DISPLAY
$DISPLAY
#查看 Xvfb 進程
ps -ef | grep Xvfb

如果只是爲了使用 webdriver 操作瀏覽器那麼這樣安裝運行 Xvfb 後就可以了,但是我們要使用 PyAutoGUI,所以還有依賴要裝。

安裝火狐瀏覽器

我個人比較偏好 ESR 版火狐,下載後解壓就可以使用了。

tar -xf firefox-115.5.0esr.tar.bz2
cd Firefox
./firefox
#如果要指定窗口大小可以
./firefox -width 1350 -height 764

安裝 x11vnc 服務

其實只要下載執行檔,運行即可,非常簡單。不過 x11vnc 12 年未有更新,不知道還能用多久。如果那天不能用了可以嘗試 Tiger VNC。

mv x11vnc-0.9.13_amd64-Linux x11vnc
chmod +x x11vnc
#運行 vnc server,密碼設置長一點,端口是 9999
./x11vnc -display :0 -ncache 0 -passwd piHrcHxmJauvIOftenUseA64digitPasswordpokiHJHQWdsgcGFTG -rfbport 9999
#上面命令在一次連接後會停止,如果要服務一直可以用,就加上 forever 參數
./x11vnc -display :0 -ncache 0 -passwd piHrcHxmJauvIOftenUseA64digitPasswordpokiHJHQWdsgcGFTG -rfbport 9999 -shared -forever
#注意服務器的網絡防火牆要打開 9999 端口
#用 nmap 查看端口是否打開
nmap -p 9999 1.1.1.1

Fedora 本地可以安裝 Remmina 作爲 VNC 客戶端。

sudo dnf install remmina -y
#如果想要使用 socks5 代理
proxychains4 /usr/bin/remmina

打開 Remmina 後在地址欄的協議選項裏選擇 VNC,然後地址填 1.1.1.1:9999 回車就可以連上服務器的桌面並看到剛剛打開的火狐瀏覽器進行設置了。點左上角的 + 號可以添加 profile,這樣下次雙擊就能連上 VNC 了。

安裝與配置 PyAutoGUI

我自己從源碼編譯的 Python 一直提示沒有 _tkinter 模塊。搜了半天沒有解決,於是使用系統自帶的 Python 3.9 得以解決。雖然我是用 Python 3.11 開發的,但是 Python 3.9 跑起來也沒問題。

import _tkinter # If this fails your Python may not be configured for Tk
ImportError: No module named _tkinter
#安裝 Python 3.9 以及依賴
sudo yum install libnsl ImageMagick xclip
sudo yum install python39 python39-tkinter

#xdotool 並不支持 Xvfb,所以不用裝
#sudo yum install xdotool

#創建虛擬環境
cd your_project
python3.9 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

示例代碼

import json
import random
import sys
import requests
import time
import logging
import os
import pyperclip
import subprocess
from urllib.parse import urlparse, parse_qs, urlunparse
from dotenv import load_dotenv
import psutil
import pyautogui
import pyscreeze

script_dir = os.path.dirname(os.path.realpath(__file__))

load_dotenv()
api_key = os.getenv("API_KEY")
proxy = os.getenv("PROXY")
de = os.getenv("DE")
batch = int(os.getenv("BATCH"))

if proxy is None:
    proxies = None
else:
    proxies = {"http": proxy, "https": proxy}

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
# Create a file handler to save logs to a file
log_file = os.path.join(script_dir, "logfile.log")
file_handler = logging.FileHandler(log_file)

# Set the logging level for the file handler (if different from the root logger)
file_handler.setLevel(logging.INFO)  # Adjust the log level if needed

# Create a formatter and add it to the file handler
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)

# Add the file handler to the logger
logger.addHandler(file_handler)


def activate_window(window_title):
    if de == "Xvfb":
        firefox_exists = False
        for proc in psutil.process_iter():
            try:
                if "firefox" in proc.name().lower():
                    firefox_exists = True
                    break
            except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                pass
        if firefox_exists is False:
            subprocess.Popen(
                [
                    "/home/centos/fred/programs/firefox/firefox",
                    "-width",
                    "1350",
                    "-height",
                    "764",
                ]
            )
            logger.info("firefox started")
            time.sleep(10)
    elif de == "Xfce":
        command = f"xdotool search --onlyvisible --name '{window_title}' windowactivate"
        subprocess.run(command, shell=True)


def wait_for_image(image_path, timeout=10):
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            location = pyautogui.locateOnScreen(
                image_path, region=(0, 0, 1360, 760), confidence=0.9, grayscale=True
            )
            if location:
                return location
        except pyautogui.ImageNotFoundException:
            pass
        time.sleep(1.5)
    return None


def wait_for_images(image_path, timeout=10):
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            location = pyautogui.locateAllOnScreen(
                image_path, region=(0, 0, 1366, 760), confidence=0.9
            )
            if location:
                return list(location)
        except pyautogui.ImageNotFoundException:
            pass
        except pyscreeze.ImageNotFoundException:
            pass
        time.sleep(1.5)
    return []

def right_click():
    pyautogui.mouseDown(button="right")  # Perform a right-click
    time.sleep(0.1)  # Adjust the delay if needed (time in seconds)
    pyautogui.mouseUp(button="right")  # Release the right-click


def triple_click(x, y):
    for _ in range(3):
        pyautogui.click(x, y)
        time.sleep(0.1)

def get_data(obj):
    start_time = time.time()
    time.sleep(0.5)
    activate_window("Mozilla Firefox")
    time.sleep(0.5)
    # load webpage
    url_profile = obj["contact_name_href"]
    if url_profile.find("?") != -1:
        url_profile = url_profile[: url_profile.find("?")]
    logger.info(url_profile)
    # open a new tab
    # pyautogui.hotkey('ctrl', 't')
    pyautogui.hotkey("ctrl", "l")
    time.sleep(0.5)
    pyautogui.typewrite(url_profile)
    time.sleep(0.5)
    pyautogui.press("enter")
    time.sleep(8)

    # check page loaded
    image_path = os.path.join(script_dir, "img/ContactDetails.png")
    loaded = wait_for_image(image_path, 40)

    # Box(left=535, top=541, width=74, height=19)
    if loaded:
        logger.info("Page loaded")

    # LinkedIn
    image_path = os.path.join(script_dir, "img/LinkedIn.png")
    l = wait_for_images(image_path, 3)
    l_index = 0
    retry = 0
    temp_l = None
    while l_index < len(l):
        logger.info(f"l_index:{l_index+1}/{len(l)}")
        if temp_l is not None:
            if temp_l[0] == l[l_index][0]:
                l_index = l_index + 1
                continue
        li = l[l_index]
        pyautogui.moveTo(li, duration=2, tween=pyautogui.easeInOutQuad)
        time.sleep(0.5)
        pyautogui.click(li)
        time.sleep(2)
        pyautogui.hotkey("ctrl", "l")
        time.sleep(0.5)
        pyautogui.hotkey("ctrl", "c")
        time.sleep(0.5)
        pyautogui.hotkey("ctrl", "w")
        time.sleep(0.5)
        copied_text = pyperclip.paste()
        url = copied_text.strip()
        url = get_link_from_redirect(url)

        if url is not None and url != "":
            # logger.info(len(url))
            if url.find("linkedin") == -1:
                logger.info("not found 'linkedin'")
                if retry < 3:
                    retry = retry + 1
                else:
                    l_index = l_index + 1
                continue

            # personal LindedIn
            if li[0] < loaded_x_left:
                obj["LinkedIn_Personal_URL"] = url
            else:
                obj["LinkedIn_URL"] = url
        temp_l = li
        l_index = l_index + 1

    # Supplemental_Email
    image_path = os.path.join(script_dir, "img/Supplemental.png")
    l = wait_for_image(image_path, 3)
    if l:
        pyautogui.moveTo(
            l[0] + l[2] + 50 + random.randint(-5, 5),
            l[1] + l[3] / 2,
            duration=1,
            tween=pyautogui.easeInOutQuad,
        )
        time.sleep(1)
        right_click()
        time.sleep(1)
        pyautogui.hotkey("l")
        time.sleep(1)
        pyautogui.hotkey("esc")
        # [email protected]
        copied_text = pyperclip.paste()
        obj["Supplemental_Email"] = copied_text.strip()

    # Local address
    image_path = os.path.join(script_dir, "img/Local.png")
    l = wait_for_image(image_path, 3)
    if l:
        x = l[0] + l[2] + 30 + random.randint(-5, 5)
        pyautogui.moveTo(x, l[1] + l[3] / 2, duration=1, tween=pyautogui.easeInOutQuad)
        time.sleep(1)
        triple_click(x, l[1] + l[3] / 2)
        time.sleep(1)
        pyautogui.hotkey("ctrl", "c")
        time.sleep(1)
        # [email protected]
        copied_text = pyperclip.paste()
        obj["Local_Location_Address"] = copied_text.strip()

def main(max_id):
    failed_list = []
    for i in range(batch):
        logger.info(f"batch:{i+1}/{batch}")
        obj = get_job()
        if obj is None:
            continue
        if obj["contact_name_href"] == "" or obj["contact_name_href"] is None:
            res = post_job(obj)
            logger.info(res)
            continue

        if obj["id"] > max_id:
            logger.info("max_id reached")
            continue
        obj = get_data(obj)
        if obj["message"] != "success":
            if obj["id"] in failed_list:
                res = post_job(obj)
                logger.info(res)
            else:
                failed_list.append(obj["id"])
            continue

        res = post_job(obj)
        logger.info(res)

if __name__ == "__main__":
    # time.sleep(10)
    # countdown
    for i in range(6):
        logger.info(f"countdown:{6-i}")
        time.sleep(1)

    # get max id from command
    if len(sys.argv) > 1:
        max_id = int(sys.argv[1])
        logger.info(f"max_id:{max_id}")
    else:
        max_id = 9999999

    main(max_id)
分類
程序

下載指定版本 Chromium 與其 Webdriver

以下載 Linux 平臺 119 版本 Chromium 與其 Webdriver 爲例:

首先在 Chromium 版本發佈頁 找到想要下載版本的 Branch Base Position 比如 1204232。然後在 https://commondatastorage.googleapis.com/chromium-browser-snapshots/index.html?prefix=Linux_x64/ 中搜索 1204232。如果發現搜索結果爲空,不用怕,把最後一位去掉試試,這時就會搜到 1204234。也可以再刪掉一位,就會出來更多相近的結果。點進去後下載 chrome-linux.zip 和 chromedriver_linux64.zip 就可以了。

unzip chrome-linux.zip
cd chrome-linux
#查看 Chrome 版本
./chrome --version
#Chromium 119.0.6045.0

在 Selenium 中指定 Chrome、Webdriver 與 Profile 位置

import os
import 
import datetime
import undetected_chromedriver as uc

MAX_RUNTIME_SECONDS = 240
# Check for and kill any Chrome processes that have been running for too long
for proc in psutil.process_iter():
    try:
        if "chrome" in proc.name().lower():
            create_time = proc.create_time()
            elapsed_time = time.time() - create_time
            if elapsed_time > MAX_RUNTIME_SECONDS:
                proc.kill()
                print("killed" + proc.name())
    except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
        pass

script_dir = os.path.dirname(os.path.realpath(__file__))
profile_dir = os.path.join(script_dir, "chrome_profile")
path_to_chrome_binary = os.path.abspath(
    script_dir + "/../../" + "chrome-linux64/chrome"
)
driver_executable_path = os.path.abspath(
    script_dir + "/../../" + "chromedriver-linux64/chromedriver"
)
proxy = "socks5://127.0.0.1:8080"

options = uc.ChromeOptions()
options.add_argument("--disable-notifications")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-save-password-bubble")

# Limit CPU and memory usage
options.add_argument("--disable-software-rasterizer")
options.add_argument("--disable-extensions")
# options.add_argument("--disable-webgl")

options.add_argument(proxy)
options.binary_location = path_to_chrome_binary

# if os.getenv("DEBUG") != "True":
#     options.add_argument("headless=True")

prefs = {
    "credentials_enable_service": False,
    "profile.password_manager_enabled": False,
    "profile.privacy_sandbox_prompt_enabled": False,
    # "profile.managed_default_content_settings.images": 1,
}
options.add_experimental_option("prefs", prefs)

try:
    driver = uc.Chrome(
        executable_path=driver_executable_path,
        browser_executable_path=path_to_chrome_binary,
        options=options,
        version_main=119,
        user_data_dir=profile_dir,
        # use_subprocess=False,
    )
except Exception as e:
    return None


driver.set_window_size(1366, 768)
driver.set_page_load_timeout(30)

#fix timeout bug
try:
    driver.get(url)
except Exception as e:
    try:
        #send Esc key to stop loading
        driver.find_element(By.XPATH, '//body').send_keys(Keys.ESCAPE)
        time.sleep(1)
        # check page loaded
        WebDriverWait(driver, 10).until(
                EC.visibility_of_element_located(
                    (By.XPATH, "//dl[@class='dfn']")
                )
            )
    except Exception as e:
        #take a screen shot
        png_name = datetime.datetime.now().isoformat()[:19]
        driver.save_screenshot(png_name + "_load" + ".png")
        return None
分類
程序

免費使用 AWS Lambda 的一個示例

這裏演示一下免費使用 AWS Lambda 創建一個查詢網頁大小的接口。之所以直接使用 Lambda 生成的鏈接而不是 API Gateway 的鏈接,是因爲 API Gateway 的免費額度是在註冊帳號的一年內有效。而且 API Gateway 最大超時只有 30 秒,而 Lambda 可以設置到 15 分鐘。

首先註冊一個帳號,我記得只要郵箱和手機號就行,不用綁定支付方式。然後在控制台搜索 Lambda 進入 Lambda 控制台。點擊 Funtction > Create function. 選擇 Author from scratch,輸入函數名字沒有特殊要求,Runtime 選 Python 3.10,Architecture 選 x86 和 arm64 都可以,各有免費額度。點開 Advanced settings,勾選 Enable function URL,勾選 None。CORS 看需要也可以勾上,然後點擊右下角 Create function 就可以了。

在函數頁面 Code 標籤頁貼上如下代碼:

import json
import os
import random
import urllib.request

UA = '''Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edge/19041.423
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 OPR/87.0.4390.99
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Vivaldi/6.1.3035.111
Mozilla/5.0 (Windows NT 10.0; WOW64; rv:85.0) Gecko/20100101 Firefox/85.0
Mozilla/5.0 (Windows NT 10.0; Win64; x64; Trident/7.0; rv:11.0) like Gecko
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.51 Safari/537.36
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36'''

US_LIST=UA.split()

def lambda_handler(event, context):
    event_body_str = event['body']
    event_body = json.loads(event_body_str)
    # print(str(event_body))
    if event_body['secret'] == os.environ.get('secret'):
        try:
            t_ua = random.choice(US_LIST)
            res = urllib.request.urlopen(urllib.request.Request(
                url=event_body['url'],
                headers={"User-Agent": t_ua,},
                method='GET'),
                timeout=180)
            content = res.read()
            return {
                'statusCode': 200,
                'body': json.dumps(str(len(content))+' '+event_body['url'])
            }
        except:
            return {
                'statusCode': 500,
                'body': json.dumps('request failed.')
            }
    else:
        return {
            'statusCode': 500,
            'body': json.dumps('unauthorized.')
        }

secret 的環境在 Configuration > Environment variables 中添加。在 General configuration 中可以設置函數的超時時長 Timeout,默認是 3 秒。在 Function URL 裏可以看到我們需要的 URL 類似:https://4xyqiuexi7l3fcnuyuevjgclei0jhpuy.lambda-url.us-east-1.on.aws/ 。使用如下 curl 命令測試下:

curl -X POST -H "Content-Type: application/json" -d '{"secret":"Dn2Xk7d7RDkK_8bILy7lVe2NJlE4y7T_", "url":"http://ip-api.com/json"}' https://4xyqiuexi7l3fcnuyuevjgclei0jhpuy.lambda-url.us-east-1.on.aws/
#應該會得到如下結果
"296 http://ip-api.com/json"

免費額度使用情況可以在 Billing > Free Tier 頁面查看。Lambda 服務每月有 400000 seconds(GB-Second,如果的函數和我的一樣用的是 128 MB,那就可以乘以 8)和 1000000 次請求。

分類
程序

在 VS Code 中調試 Django Q 任務

在 .vscode 文件夾中創建一個 lanch.json 文件, 內容如下:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Django Q",
            "type": "python",
            "request": "launch",
            "program": "${workspaceFolder}/manage.py",
            "args": [
                "qcluster"
            ],
            "django": true,
            "env": {
                "DJANGO_SETTINGS_MODULE": "YOUR_PROJECT_NAME.settings"
            },
            "gevent": true
        },
        // Your other configurations go here
    ]
}

只需修改兩個地方:program 裏的 ${workspaceFolder} 是變量,如果你項目的 manage.py 在 VS Code 項目根目錄下,那這行就不用修改。 DJANGO_SETTINGS_MODULE 這裏需要把 YOUR_PROJECT_NAME 替換成你項目的名字。然後按 Debug 按鈕就可以調試 Django Q task 了。

對於這個問題,ChatGPT 4 前前後後給出了五次錯誤配置,不得不記錄一下。

分類
程序

使用 Cloudflare Worker 中轉 HTTP 請求

Cloudflare Worker 可以方便的中轉 HTTP 請求,下面示例是我之前用過的,算是密碼保護的中轉特定請求。其中的 X_Custom_PSK 算是密碼,在 Settings > Variables 設置,這樣就只有我的程序可以請求。

addEventListener("fetch", event => {

  const psk = event.request.headers.get("X_Custom_PSK");
  if (psk === X_Custom_PSK) {
    event.respondWith(handleRequest(event.request));
  }else{
    const failed_response = new Response('Sorry, you have supplied an invalid key.', {
      status: 403,
    });
    event.respondWith(failed_response);
  }
})

async function handleRequest(request) {
  const resp = await fetch('https://domain.ltd/checkpoint/list', {
      method: 'POST',
      headers: {
          'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.3 Safari/537.36',
          'Accept': '*/*',
          'Accept-Language': 'zh-CN;q=0.8,zh;q=0.6,en-US;q=0.4,en;q=0.2',
          'Origin': 'https://domain.ltd',
          'DNT': '1',
          'Connection': 'keep-alive',
          'Referer': 'https://domain.ltd/',
          'Sec-Fetch-Dest': 'empty',
          'Sec-Fetch-Mode': 'cors',
          'Sec-Fetch-Site': 'cross-site'
      },
      body: new URLSearchParams({
          'page': '1',
          'pageSize': '10',
          'latitude': '22.5',
          'longitude': '114.0',
          'queryType': '0'
      })
  });
  return resp;
}

下面這個則是用一個 worker 代理多個網站。

addEventListener("fetch", event => {
  let url=new URL(event.request.url);
  if (event.request.url.indexOf('shaman')>-1){
      url.hostname="ft.shaman.eu.org";
  }else{
      url.hostname="www.rfi.fr";
  }  
  let request=new Request(url,event.request);
  event.respondWith(fetch(request));
});
分類
Linux 程序

Django簡易搭建上傳文件

這裡使用Django+Gunicorn+Nginx的方式簡單運行一個小型webserver,實現一個簡單的上傳文件到服務器的功能(並不生成下載鏈接)。

啟動虛擬環境,安裝django和gunicorn:

pip install Django==2.0
pip install gunicorn
#進入要放置代碼的目錄並新建項目
django-admin startproject mysite
##或者在當前目錄建立項目
#django-admin startproject mysite .
cd mysite
#新建app
python manage.py startapp polls

先建立一個表格:

#polls/forms.py
from django import forms

class UploadFileForm(forms.Form):
    title = forms.CharField(label='密碼',max_length=20,widget=forms.PasswordInput)
    file = forms.FileField(label='文件',)

修改view:

#polls/views.py
import os
import subprocess
from django.core.files.storage import FileSystemStorage
from django.conf import settings
from django.shortcuts import render
from django.http import HttpResponse
from .forms import UploadFileForm

#handle file example with file
def handle_uploaded_file(f):
    with open('/file/should/be/saved/here/target.odt', 'wb+') as destination:
        for chunk in f.chunks():
            destination.write(chunk)

#another handle file example with filename
def handle_uploaded_file2(filename):
    msg=''
    try:
        targetZipFilePath = os.path.join(settings.BASE_DIR, filename)
        cmd1=subprocess.check_call(["unzip", "-o", targetZipFilePath, "-d", "/home/fred/workspace/"])
        if cmd1==0 :
            cmd2=subprocess.check_call(["cp", "-Rf", "/home/fred/workspace/dist", "/home/fred/"])
            if cmd2==0 :
                msg="deployed successfully"
            else:
                msg="error 2"
        else:
            msg="error 1"
    except:
        msg = 'error 0'
    return msg

def upload_file(request):
    if request.method == 'POST':
        form = UploadFileForm(request.POST, request.FILES)
        if form.is_valid() and request.POST['title']=='Secret':
#            handle_uploaded_file(request.FILES['file'])
#            return HttpResponse("上傳成功")
            myfile = request.FILES['file']
            fs = FileSystemStorage()
            filename = fs.save(myfile.name, myfile)
            uploaded_file_url = fs.url(filename)
            print(uploaded_file_url)
            res = handle_uploaded_file2(uploaded_file_url)
            return HttpResponse(res)
    else:
        form = UploadFileForm()
    return render(request, 'upload.html', {'form': form})

新建一個表格的模板:

#polls/templates/upload.html
<form enctype="multipart/form-data" action="/polls/upload/" method="post">
    {% csrf_token %}
    {{ form }}
    <input type="submit" value="上傳" />
</form>

新建一個url路由表:

#polls/urls.py
from django.urls import path

from . import views

urlpatterns = [
    path('upload/', views.upload_file, name='upload_file'),
]

修改項目路由:

#mysite/urls.py
from django.contrib import admin
from django.urls import include,path

urlpatterns = [
    path('polls/', include('polls.urls')),
    path('admin/', admin.site.urls),
]

修改項目設置:

#mysite/settings.py
INSTALLED_APPS = [    
    'polls.apps.PollsConfig',
#    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
#    'django.contrib.messages',
#    'django.contrib.staticfiles',
]
LANGUAGE_CODE = 'zh-Hant'
TIME_ZONE = 'Asia/Taipei'

然後在項目目錄(最上層)運行gunicorn就可以訪問了:

gunicorn mysite.wsgi --bind 127.0.0.1:3040

nginx中增加如下server即可在外網訪問了(鏈接應該是http://YourPublicIP:8081/polls/upload/):

server {
        listen 8081;
        server_name 127.0.0.1;
        charset utf-8;
        keepalive_timeout 60s;
        #access_log logs/django2a.access.log combined if=$loggable;
        
        location / {
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_redirect off;
    
            if (!-f $request_filename) {
                proxy_pass http://127.0.0.1:3040;
                break;
            }
    }
}

CentOS6+Django2+MySql

yum install MySQL-python
pip install mysqlclient
#如果import MySQLdb提示無libmysqlclient.so.18
#則建立軟連接如下
ln -s /usr/local/mysql/lib/libmysqlclient.so.18 /usr/lib64/libmysqlclient.so.18