分類
程序 软件

將 flightradar24 航班位置記錄轉換為 gpx 文檔

前幾天坐飛機,一覺醒來看到壯麗的雪山。拍了不少照片,但是並不清楚雪山的大概位置。想要給飛機上拍攝的照片加上地理位置,需要 gpx 文檔,然後用 digiKam 就可以標記照片的位置。怎麼取得航班的位置呢?有兩個方法。

第一個是 FlightAware。搜索到航班的歷史記錄如這個 ZH8960。頁面上有個「+ Google Earth」的圖標,點擊後會得到一個 kml 檔案。然後到 kml2gpx.com 即可將文檔轉換為所需要的 gpx 文檔。但是這個方法的問題在於 FlightAware 的數據會有一部分是直線,看起來就是沒有 GPS 數據,所以我最終使用的方法二。

第二個略複雜,是用 flightradar24 的數據。搜索結果頁也有 kml 檔案下載按鈕,但是需要成為會員才能使用。所以點擊旁邊的回放按鈕,來到回放頁面。然後找到包含 GPS 數據的網絡請求:按 F12 按鈕打開瀏覽器開發者工具 > 切換到「網絡」標籤 > 刷新頁面 > 在過濾請求的輸入框里填寫 flight-playback.json 即可看到所需要的請求 > 在請求上右鍵 > 選擇「保存回應為(Save Response As)即可獲得包含 GPS 數據的 json 文檔。然後使用我的這個 Python 腳本即可將 json 文檔轉換為 gpx 文檔。

######################
#file json2gpx.py
# convert json file from www.flightradar24.com to gps file.
# get the json file by open the web page: https://www.flightradar24.com/data/flights/zh8960#3a6f819a
# then save the request https://api.flightradar24.com/common/v1/flight-playback.json?flightId=3a6f819a&timestamp=1747838700 as flight-playback.json
# run python json2gpx.py, and get output.gpx
######################

import json
import xml.etree.ElementTree as ET
from datetime import datetime, timezone

#read json from file
with open('flight-playback.json', 'r') as f:
    data_all = json.load(f)
data = data_all['result']['response']['data']['flight']['track']

# Create GPX root
gpx = ET.Element("gpx", version="1.1", creator="JSON-to-GPX Converter", xmlns="http://www.topografix.com/GPX/1/1")
trk = ET.SubElement(gpx, "trk")
trkseg = ET.SubElement(trk, "trkseg")

# Add track points
for point in data:
    trkpt = ET.SubElement(trkseg, "trkpt", lat=str(point["latitude"]), lon=str(point["longitude"]))
    ET.SubElement(trkpt, "ele").text = str(point["altitude"]["meters"])
    timestamp_iso = datetime.fromtimestamp(point["timestamp"], tz=timezone.utc).isoformat().replace("+00:00", "Z")
    ET.SubElement(trkpt, "time").text = timestamp_iso

# Write to GPX file
tree = ET.ElementTree(gpx)
with open("output.gpx", "wb") as f:
    tree.write(f, encoding="utf-8", xml_declaration=True)

print("GPX file created as 'output.gpx'")

分類
程序

使用 Python 或/和 Cloudflare Worker 代理 NextDNS 的 DoH 服務

Previously:使用 nginx 代理 NextDNS 的 DoH 服務

使用 Cloudflare Worker 代理 NextDNS 的 DoH 服務

使用 tina-hello 的 doh-cf-workers 項目可以方便的建立一個 DoH 代理。由於只需要一個 index.js 文件,所以我這裡轉載一下。默認的示例是 Cloudflare 的 DoH,切換成 NextDNS 的話只需要修改 doh 和 dohjson 的網址,其他都不用動。

// SPDX-License-Identifier: 0BSD

const doh = 'https://dns.nextdns.io/YOUR_ID/YOUR_TAG'
const dohjson = 'https://dns.nextdns.io/YOUR_ID/YOUR_TAG'
const contype = 'application/dns-message'
const jstontype = 'application/dns-json'
const r404 = new Response(null, {status: 404});

// developers.cloudflare.com/workers/runtime-apis/fetch-event/#syntax-module-worker
export default {
    async fetch(r, env, ctx) {
        return handleRequest(r);
    },
};

async function handleRequest(request) {
    // when res is a Promise<Response>, it reduces billed wall-time
    // blog.cloudflare.com/workers-optimization-reduces-your-bill
    let res = r404;
    const { method, headers, url } = request
    const searchParams = new URL(url).searchParams
    if (method == 'GET' && searchParams.has('dns')) {
        res = fetch(doh + '?dns=' + searchParams.get('dns'), {
            method: 'GET',
            headers: {
                'Accept': contype,
            }
        });
    } else if (method === 'POST' && headers.get('content-type') === contype) {
        // streaming out the request body is optimal than awaiting on it
        const rostream = request.body;
        res = fetch(doh, {
            method: 'POST',
            headers: {
                'Accept': contype,
                'Content-Type': contype,
            },
            body: rostream,
        });
    } else if (method === 'GET' && headers.get('Accept') === jstontype) {
        const search = new URL(url).search
         res = fetch(dohjson + search, {
            method: 'GET',
            headers: {
                'Accept': jstontype,
            }
        });
    }
    return res;
}
#測試下 GET 請求
curl -H 'accept: application/dns-json' 'https://YOURPROJECT.YOURWORKER.workers.dev/dns-query?name=ft.shaman.eu.org&type=A'
#POST請求由於需要構造 DNS 參數,所以就不測了

使用 Python 代理 NextDNS 的 DoH 服務

輕量級的 API 框架我不熟悉,所以這裡還是用 Django。

#file:app1/views.py
from django.http import HttpResponse
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import requests

@csrf_exempt
def forward_to_doh(request):
    cloudflare_doh_url = "https://YOURPROJECT.YOURWORKER.workers.dev/"
    contype = 'application/dns-message'
    jstontype = 'application/dns-json'

    try:
        if request.method == 'GET':
            if 'dns' in request.GET:
                params = {'dns': request.GET['dns']}
                headers = {'Accept': contype}
                response = requests.get(cloudflare_doh_url, params=params, headers=headers)
                return JsonResponse(response.json(), status=response.status_code)
            elif request.headers.get('Accept') == jstontype:
                response = requests.get(cloudflare_doh_url + '?' + request.GET.urlencode(), headers={'Accept': jstontype})
                return JsonResponse(response.json(), status=response.status_code)
        elif request.method == 'POST':
            if request.headers.get('content-type') == contype:
                headers = {'Accept': contype, 'Content-Type': contype}
                data = request.body
                response = requests.post(cloudflare_doh_url, data=data, headers=headers)
                return HttpResponse(response.content, content_type=response.headers['Content-Type'], status=response.status_code)
        else:
            return JsonResponse({'error': 'Unsupported request method'}, status=405)
        return JsonResponse({'error': 'Not Found'}, status=404)

    except requests.exceptions.RequestException as e:
        return JsonResponse({'error': str(e)}, status=500)
#file:mysite/urls.py
from django.urls import include,path
from app1 import views

urlpatterns = [
    path('YourSecNextDns/', views.forward_to_doh, name='forward_to_doh'),
    path('i/', include('i.urls')),
]
#測試下 GET 請求
curl -H 'accept: application/dns-json' 'https://YOUR_DOMAIN.LTD/YourSecNextDns/?name=ft.shaman.eu.org&type=A'

上面的請求經過三次請求,速度(高達秒級)自然是比不了直連。但是為了體驗 DoH,還是可以用用看。反正網速本身也不快,DNS 慢一點完全可以怪到網速上。

分類
程序

在 CentOS 8 上使用 Xvfb 和 PyAutoGUI

本文介紹在 CentOS 8 上使用 PyAutoGUI 操作運行與 Xvfb 桌面環境中的 Firefox 所需的準備。由於所操作網頁需要登錄並有 OTP,所以還需要使用 VNC 來手動輸入密碼以做準備工作。

安裝 Xvfb

sudo yum install xorg-x11-server-Xvfb
#啓動 Xvfb
Xvfb :0 -screen 0 1366x768x24+32 -br +bs -ac &
#修改環境變量 DISPLAY
export DISPLAY=:0
#查看環境變量 DISPLAY
$DISPLAY
#查看 Xvfb 進程
ps -ef | grep Xvfb

如果只是爲了使用 webdriver 操作瀏覽器那麼這樣安裝運行 Xvfb 後就可以了,但是我們要使用 PyAutoGUI,所以還有依賴要裝。

安裝火狐瀏覽器

我個人比較偏好 ESR 版火狐,下載後解壓就可以使用了。

tar -xf firefox-115.5.0esr.tar.bz2
cd Firefox
./firefox
#如果要指定窗口大小可以
./firefox -width 1350 -height 764

安裝 x11vnc 服務

其實只要下載執行檔,運行即可,非常簡單。不過 x11vnc 12 年未有更新,不知道還能用多久。如果那天不能用了可以嘗試 Tiger VNC。

mv x11vnc-0.9.13_amd64-Linux x11vnc
chmod +x x11vnc
#運行 vnc server,密碼設置長一點,端口是 9999
./x11vnc -display :0 -ncache 0 -passwd piHrcHxmJauvIOftenUseA64digitPasswordpokiHJHQWdsgcGFTG -rfbport 9999
#上面命令在一次連接後會停止,如果要服務一直可以用,就加上 forever 參數
./x11vnc -display :0 -ncache 0 -passwd piHrcHxmJauvIOftenUseA64digitPasswordpokiHJHQWdsgcGFTG -rfbport 9999 -shared -forever
#注意服務器的網絡防火牆要打開 9999 端口
#用 nmap 查看端口是否打開
nmap -p 9999 1.1.1.1

Fedora 本地可以安裝 Remmina 作爲 VNC 客戶端。

sudo dnf install remmina -y
#如果想要使用 socks5 代理
proxychains4 /usr/bin/remmina

打開 Remmina 後在地址欄的協議選項裏選擇 VNC,然後地址填 1.1.1.1:9999 回車就可以連上服務器的桌面並看到剛剛打開的火狐瀏覽器進行設置了。點左上角的 + 號可以添加 profile,這樣下次雙擊就能連上 VNC 了。

安裝與配置 PyAutoGUI

我自己從源碼編譯的 Python 一直提示沒有 _tkinter 模塊。搜了半天沒有解決,於是使用系統自帶的 Python 3.9 得以解決。雖然我是用 Python 3.11 開發的,但是 Python 3.9 跑起來也沒問題。

import _tkinter # If this fails your Python may not be configured for Tk
ImportError: No module named _tkinter
#安裝 Python 3.9 以及依賴
sudo yum install libnsl ImageMagick xclip
sudo yum install python39 python39-tkinter

#xdotool 並不支持 Xvfb,所以不用裝
#sudo yum install xdotool

#創建虛擬環境
cd your_project
python3.9 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

示例代碼

import json
import random
import sys
import requests
import time
import logging
import os
import pyperclip
import subprocess
from urllib.parse import urlparse, parse_qs, urlunparse
from dotenv import load_dotenv
import psutil
import pyautogui
import pyscreeze

script_dir = os.path.dirname(os.path.realpath(__file__))

load_dotenv()
api_key = os.getenv("API_KEY")
proxy = os.getenv("PROXY")
de = os.getenv("DE")
batch = int(os.getenv("BATCH"))

if proxy is None:
    proxies = None
else:
    proxies = {"http": proxy, "https": proxy}

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
# Create a file handler to save logs to a file
log_file = os.path.join(script_dir, "logfile.log")
file_handler = logging.FileHandler(log_file)

# Set the logging level for the file handler (if different from the root logger)
file_handler.setLevel(logging.INFO)  # Adjust the log level if needed

# Create a formatter and add it to the file handler
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)

# Add the file handler to the logger
logger.addHandler(file_handler)


def activate_window(window_title):
    if de == "Xvfb":
        firefox_exists = False
        for proc in psutil.process_iter():
            try:
                if "firefox" in proc.name().lower():
                    firefox_exists = True
                    break
            except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                pass
        if firefox_exists is False:
            subprocess.Popen(
                [
                    "/home/centos/fred/programs/firefox/firefox",
                    "-width",
                    "1350",
                    "-height",
                    "764",
                ]
            )
            logger.info("firefox started")
            time.sleep(10)
    elif de == "Xfce":
        command = f"xdotool search --onlyvisible --name '{window_title}' windowactivate"
        subprocess.run(command, shell=True)


def wait_for_image(image_path, timeout=10):
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            location = pyautogui.locateOnScreen(
                image_path, region=(0, 0, 1360, 760), confidence=0.9, grayscale=True
            )
            if location:
                return location
        except pyautogui.ImageNotFoundException:
            pass
        time.sleep(1.5)
    return None


def wait_for_images(image_path, timeout=10):
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            location = pyautogui.locateAllOnScreen(
                image_path, region=(0, 0, 1366, 760), confidence=0.9
            )
            if location:
                return list(location)
        except pyautogui.ImageNotFoundException:
            pass
        except pyscreeze.ImageNotFoundException:
            pass
        time.sleep(1.5)
    return []

def right_click():
    pyautogui.mouseDown(button="right")  # Perform a right-click
    time.sleep(0.1)  # Adjust the delay if needed (time in seconds)
    pyautogui.mouseUp(button="right")  # Release the right-click


def triple_click(x, y):
    for _ in range(3):
        pyautogui.click(x, y)
        time.sleep(0.1)

def get_data(obj):
    start_time = time.time()
    time.sleep(0.5)
    activate_window("Mozilla Firefox")
    time.sleep(0.5)
    # load webpage
    url_profile = obj["contact_name_href"]
    if url_profile.find("?") != -1:
        url_profile = url_profile[: url_profile.find("?")]
    logger.info(url_profile)
    # open a new tab
    # pyautogui.hotkey('ctrl', 't')
    pyautogui.hotkey("ctrl", "l")
    time.sleep(0.5)
    pyautogui.typewrite(url_profile)
    time.sleep(0.5)
    pyautogui.press("enter")
    time.sleep(8)

    # check page loaded
    image_path = os.path.join(script_dir, "img/ContactDetails.png")
    loaded = wait_for_image(image_path, 40)

    # Box(left=535, top=541, width=74, height=19)
    if loaded:
        logger.info("Page loaded")

    # LinkedIn
    image_path = os.path.join(script_dir, "img/LinkedIn.png")
    l = wait_for_images(image_path, 3)
    l_index = 0
    retry = 0
    temp_l = None
    while l_index < len(l):
        logger.info(f"l_index:{l_index+1}/{len(l)}")
        if temp_l is not None:
            if temp_l[0] == l[l_index][0]:
                l_index = l_index + 1
                continue
        li = l[l_index]
        pyautogui.moveTo(li, duration=2, tween=pyautogui.easeInOutQuad)
        time.sleep(0.5)
        pyautogui.click(li)
        time.sleep(2)
        pyautogui.hotkey("ctrl", "l")
        time.sleep(0.5)
        pyautogui.hotkey("ctrl", "c")
        time.sleep(0.5)
        pyautogui.hotkey("ctrl", "w")
        time.sleep(0.5)
        copied_text = pyperclip.paste()
        url = copied_text.strip()
        url = get_link_from_redirect(url)

        if url is not None and url != "":
            # logger.info(len(url))
            if url.find("linkedin") == -1:
                logger.info("not found 'linkedin'")
                if retry < 3:
                    retry = retry + 1
                else:
                    l_index = l_index + 1
                continue

            # personal LindedIn
            if li[0] < loaded_x_left:
                obj["LinkedIn_Personal_URL"] = url
            else:
                obj["LinkedIn_URL"] = url
        temp_l = li
        l_index = l_index + 1

    # Supplemental_Email
    image_path = os.path.join(script_dir, "img/Supplemental.png")
    l = wait_for_image(image_path, 3)
    if l:
        pyautogui.moveTo(
            l[0] + l[2] + 50 + random.randint(-5, 5),
            l[1] + l[3] / 2,
            duration=1,
            tween=pyautogui.easeInOutQuad,
        )
        time.sleep(1)
        right_click()
        time.sleep(1)
        pyautogui.hotkey("l")
        time.sleep(1)
        pyautogui.hotkey("esc")
        # [email protected]
        copied_text = pyperclip.paste()
        obj["Supplemental_Email"] = copied_text.strip()

    # Local address
    image_path = os.path.join(script_dir, "img/Local.png")
    l = wait_for_image(image_path, 3)
    if l:
        x = l[0] + l[2] + 30 + random.randint(-5, 5)
        pyautogui.moveTo(x, l[1] + l[3] / 2, duration=1, tween=pyautogui.easeInOutQuad)
        time.sleep(1)
        triple_click(x, l[1] + l[3] / 2)
        time.sleep(1)
        pyautogui.hotkey("ctrl", "c")
        time.sleep(1)
        # [email protected]
        copied_text = pyperclip.paste()
        obj["Local_Location_Address"] = copied_text.strip()

def main(max_id):
    failed_list = []
    for i in range(batch):
        logger.info(f"batch:{i+1}/{batch}")
        obj = get_job()
        if obj is None:
            continue
        if obj["contact_name_href"] == "" or obj["contact_name_href"] is None:
            res = post_job(obj)
            logger.info(res)
            continue

        if obj["id"] > max_id:
            logger.info("max_id reached")
            continue
        obj = get_data(obj)
        if obj["message"] != "success":
            if obj["id"] in failed_list:
                res = post_job(obj)
                logger.info(res)
            else:
                failed_list.append(obj["id"])
            continue

        res = post_job(obj)
        logger.info(res)

if __name__ == "__main__":
    # time.sleep(10)
    # countdown
    for i in range(6):
        logger.info(f"countdown:{6-i}")
        time.sleep(1)

    # get max id from command
    if len(sys.argv) > 1:
        max_id = int(sys.argv[1])
        logger.info(f"max_id:{max_id}")
    else:
        max_id = 9999999

    main(max_id)
分類
程序

下載指定版本 Chromium 與其 Webdriver

以下載 Linux 平臺 119 版本 Chromium 與其 Webdriver 爲例:

首先在 Chromium 版本發佈頁 找到想要下載版本的 Branch Base Position 比如 1204232。然後在 https://commondatastorage.googleapis.com/chromium-browser-snapshots/index.html?prefix=Linux_x64/ 中搜索 1204232。如果發現搜索結果爲空,不用怕,把最後一位去掉試試,這時就會搜到 1204234。也可以再刪掉一位,就會出來更多相近的結果。點進去後下載 chrome-linux.zip 和 chromedriver_linux64.zip 就可以了。

unzip chrome-linux.zip
cd chrome-linux
#查看 Chrome 版本
./chrome --version
#Chromium 119.0.6045.0

在 Selenium 中指定 Chrome、Webdriver 與 Profile 位置

import os
import 
import datetime
import undetected_chromedriver as uc

MAX_RUNTIME_SECONDS = 240
# Check for and kill any Chrome processes that have been running for too long
for proc in psutil.process_iter():
    try:
        if "chrome" in proc.name().lower():
            create_time = proc.create_time()
            elapsed_time = time.time() - create_time
            if elapsed_time > MAX_RUNTIME_SECONDS:
                proc.kill()
                print("killed" + proc.name())
    except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
        pass

script_dir = os.path.dirname(os.path.realpath(__file__))
profile_dir = os.path.join(script_dir, "chrome_profile")
path_to_chrome_binary = os.path.abspath(
    script_dir + "/../../" + "chrome-linux64/chrome"
)
driver_executable_path = os.path.abspath(
    script_dir + "/../../" + "chromedriver-linux64/chromedriver"
)
proxy = "socks5://127.0.0.1:8080"

options = uc.ChromeOptions()
options.add_argument("--disable-notifications")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-save-password-bubble")

# Limit CPU and memory usage
options.add_argument("--disable-software-rasterizer")
options.add_argument("--disable-extensions")
# options.add_argument("--disable-webgl")

options.add_argument(proxy)
options.binary_location = path_to_chrome_binary

# if os.getenv("DEBUG") != "True":
#     options.add_argument("headless=True")

prefs = {
    "credentials_enable_service": False,
    "profile.password_manager_enabled": False,
    "profile.privacy_sandbox_prompt_enabled": False,
    # "profile.managed_default_content_settings.images": 1,
}
options.add_experimental_option("prefs", prefs)

try:
    driver = uc.Chrome(
        executable_path=driver_executable_path,
        browser_executable_path=path_to_chrome_binary,
        options=options,
        version_main=119,
        user_data_dir=profile_dir,
        # use_subprocess=False,
    )
except Exception as e:
    return None


driver.set_window_size(1366, 768)
driver.set_page_load_timeout(30)

#fix timeout bug
try:
    driver.get(url)
except Exception as e:
    try:
        #send Esc key to stop loading
        driver.find_element(By.XPATH, '//body').send_keys(Keys.ESCAPE)
        time.sleep(1)
        # check page loaded
        WebDriverWait(driver, 10).until(
                EC.visibility_of_element_located(
                    (By.XPATH, "//dl[@class='dfn']")
                )
            )
    except Exception as e:
        #take a screen shot
        png_name = datetime.datetime.now().isoformat()[:19]
        driver.save_screenshot(png_name + "_load" + ".png")
        return None
分類
程序

免費使用 AWS Lambda 的一個示例

這裏演示一下免費使用 AWS Lambda 創建一個查詢網頁大小的接口。之所以直接使用 Lambda 生成的鏈接而不是 API Gateway 的鏈接,是因爲 API Gateway 的免費額度是在註冊帳號的一年內有效。而且 API Gateway 最大超時只有 30 秒,而 Lambda 可以設置到 15 分鐘。

首先註冊一個帳號,我記得只要郵箱和手機號就行,不用綁定支付方式。然後在控制台搜索 Lambda 進入 Lambda 控制台。點擊 Funtction > Create function. 選擇 Author from scratch,輸入函數名字沒有特殊要求,Runtime 選 Python 3.10,Architecture 選 x86 和 arm64 都可以,各有免費額度。點開 Advanced settings,勾選 Enable function URL,勾選 None。CORS 看需要也可以勾上,然後點擊右下角 Create function 就可以了。

在函數頁面 Code 標籤頁貼上如下代碼:

import json
import os
import random
import urllib.request

UA = '''Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edge/19041.423
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 OPR/87.0.4390.99
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Vivaldi/6.1.3035.111
Mozilla/5.0 (Windows NT 10.0; WOW64; rv:85.0) Gecko/20100101 Firefox/85.0
Mozilla/5.0 (Windows NT 10.0; Win64; x64; Trident/7.0; rv:11.0) like Gecko
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.51 Safari/537.36
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36'''

US_LIST=UA.split()

def lambda_handler(event, context):
    event_body_str = event['body']
    event_body = json.loads(event_body_str)
    # print(str(event_body))
    if event_body['secret'] == os.environ.get('secret'):
        try:
            t_ua = random.choice(US_LIST)
            res = urllib.request.urlopen(urllib.request.Request(
                url=event_body['url'],
                headers={"User-Agent": t_ua,},
                method='GET'),
                timeout=180)
            content = res.read()
            return {
                'statusCode': 200,
                'body': json.dumps(str(len(content))+' '+event_body['url'])
            }
        except:
            return {
                'statusCode': 500,
                'body': json.dumps('request failed.')
            }
    else:
        return {
            'statusCode': 500,
            'body': json.dumps('unauthorized.')
        }

secret 的環境在 Configuration > Environment variables 中添加。在 General configuration 中可以設置函數的超時時長 Timeout,默認是 3 秒。在 Function URL 裏可以看到我們需要的 URL 類似:https://4xyqiuexi7l3fcnuyuevjgclei0jhpuy.lambda-url.us-east-1.on.aws/ 。使用如下 curl 命令測試下:

curl -X POST -H "Content-Type: application/json" -d '{"secret":"Dn2Xk7d7RDkK_8bILy7lVe2NJlE4y7T_", "url":"http://ip-api.com/json"}' https://4xyqiuexi7l3fcnuyuevjgclei0jhpuy.lambda-url.us-east-1.on.aws/
#應該會得到如下結果
"296 http://ip-api.com/json"

免費額度使用情況可以在 Billing > Free Tier 頁面查看。Lambda 服務每月有 400000 seconds(GB-Second,如果的函數和我的一樣用的是 128 MB,那就可以乘以 8)和 1000000 次請求。

分類
程序

在 VS Code 中調試 Django Q 任務

在 .vscode 文件夾中創建一個 lanch.json 文件, 內容如下:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Django Q",
            "type": "python",
            "request": "launch",
            "program": "${workspaceFolder}/manage.py",
            "args": [
                "qcluster"
            ],
            "django": true,
            "env": {
                "DJANGO_SETTINGS_MODULE": "YOUR_PROJECT_NAME.settings"
            },
            "gevent": true
        },
        // Your other configurations go here
    ]
}

只需修改兩個地方:program 裏的 ${workspaceFolder} 是變量,如果你項目的 manage.py 在 VS Code 項目根目錄下,那這行就不用修改。 DJANGO_SETTINGS_MODULE 這裏需要把 YOUR_PROJECT_NAME 替換成你項目的名字。然後按 Debug 按鈕就可以調試 Django Q task 了。

對於這個問題,ChatGPT 4 前前後後給出了五次錯誤配置,不得不記錄一下。

分類
程序

使用 Cloudflare Worker 中轉 HTTP 請求

Cloudflare Worker 可以方便的中轉 HTTP 請求,下面示例是我之前用過的,算是密碼保護的中轉特定請求。其中的 X_Custom_PSK 算是密碼,在 Settings > Variables 設置,這樣就只有我的程序可以請求。

addEventListener("fetch", event => {

  const psk = event.request.headers.get("X_Custom_PSK");
  if (psk === X_Custom_PSK) {
    event.respondWith(handleRequest(event.request));
  }else{
    const failed_response = new Response('Sorry, you have supplied an invalid key.', {
      status: 403,
    });
    event.respondWith(failed_response);
  }
})

async function handleRequest(request) {
  const resp = await fetch('https://domain.ltd/checkpoint/list', {
      method: 'POST',
      headers: {
          'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.3 Safari/537.36',
          'Accept': '*/*',
          'Accept-Language': 'zh-CN;q=0.8,zh;q=0.6,en-US;q=0.4,en;q=0.2',
          'Origin': 'https://domain.ltd',
          'DNT': '1',
          'Connection': 'keep-alive',
          'Referer': 'https://domain.ltd/',
          'Sec-Fetch-Dest': 'empty',
          'Sec-Fetch-Mode': 'cors',
          'Sec-Fetch-Site': 'cross-site'
      },
      body: new URLSearchParams({
          'page': '1',
          'pageSize': '10',
          'latitude': '22.5',
          'longitude': '114.0',
          'queryType': '0'
      })
  });
  return resp;
}

下面這個則是用一個 worker 代理多個網站。

addEventListener("fetch", event => {
  let url=new URL(event.request.url);
  if (event.request.url.indexOf('shaman')>-1){
      url.hostname="ft.shaman.eu.org";
  }else{
      url.hostname="www.rfi.fr";
  }  
  let request=new Request(url,event.request);
  event.respondWith(fetch(request));
});