分類
程序

在 CentOS 8 上使用 Xvfb 和 PyAutoGUI

本文介紹在 CentOS 8 上使用 PyAutoGUI 操作運行與 Xvfb 桌面環境中的 Firefox 所需的準備。由於所操作網頁需要登錄並有 OTP,所以還需要使用 VNC 來手動輸入密碼以做準備工作。

安裝 Xvfb

1
2
3
4
5
6
7
8
9
sudo yum install xorg-x11-server-Xvfb
#啓動 Xvfb
Xvfb :0 -screen 0 1366x768x24+32 -br +bs -ac &
#修改環境變量 DISPLAY
export DISPLAY=:0
#查看環境變量 DISPLAY
$DISPLAY
#查看 Xvfb 進程
ps -ef | grep Xvfb

如果只是爲了使用 webdriver 操作瀏覽器那麼這樣安裝運行 Xvfb 後就可以了,但是我們要使用 PyAutoGUI,所以還有依賴要裝。

安裝火狐瀏覽器

我個人比較偏好 ESR 版火狐,下載後解壓就可以使用了。

1
2
3
4
5
tar -xf firefox-115.5.0esr.tar.bz2
cd Firefox
./firefox
#如果要指定窗口大小可以
./firefox -width 1350 -height 764

安裝 x11vnc 服務

其實只要下載執行檔,運行即可,非常簡單。不過 x11vnc 12 年未有更新,不知道還能用多久。如果那天不能用了可以嘗試 Tiger VNC。

1
2
3
4
5
6
7
8
9
mv x11vnc-0.9.13_amd64-Linux x11vnc
chmod +x x11vnc
#運行 vnc server,密碼設置長一點,端口是 9999
./x11vnc -display :0 -ncache 0 -passwd piHrcHxmJauvIOftenUseA64digitPasswordpokiHJHQWdsgcGFTG -rfbport 9999
#上面命令在一次連接後會停止,如果要服務一直可以用,就加上 forever 參數
./x11vnc -display :0 -ncache 0 -passwd piHrcHxmJauvIOftenUseA64digitPasswordpokiHJHQWdsgcGFTG -rfbport 9999 -shared -forever
#注意服務器的網絡防火牆要打開 9999 端口
#用 nmap 查看端口是否打開
nmap -p 9999 1.1.1.1

Fedora 本地可以安裝 Remmina 作爲 VNC 客戶端。

1
2
3
sudo dnf install remmina -y
#如果想要使用 socks5 代理
proxychains4 /usr/bin/remmina

打開 Remmina 後在地址欄的協議選項裏選擇 VNC,然後地址填 1.1.1.1:9999 回車就可以連上服務器的桌面並看到剛剛打開的火狐瀏覽器進行設置了。點左上角的 + 號可以添加 profile,這樣下次雙擊就能連上 VNC 了。

安裝與配置 PyAutoGUI

我自己從源碼編譯的 Python 一直提示沒有 _tkinter 模塊。搜了半天沒有解決,於是使用系統自帶的 Python 3.9 得以解決。雖然我是用 Python 3.11 開發的,但是 Python 3.9 跑起來也沒問題。

1
2
import _tkinter # If this fails your Python may not be configured for Tk
ImportError: No module named _tkinter
1
2
3
4
5
6
7
8
9
10
11
12
#安裝 Python 3.9 以及依賴
sudo yum install libnsl ImageMagick xclip
sudo yum install python39 python39-tkinter
 
#xdotool 並不支持 Xvfb,所以不用裝
#sudo yum install xdotool
 
#創建虛擬環境
cd your_project
python3.9 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

示例代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import json
import random
import sys
import requests
import time
import logging
import os
import pyperclip
import subprocess
from urllib.parse import urlparse, parse_qs, urlunparse
from dotenv import load_dotenv
import psutil
import pyautogui
import pyscreeze
 
script_dir = os.path.dirname(os.path.realpath(__file__))
 
load_dotenv()
api_key = os.getenv("API_KEY")
proxy = os.getenv("PROXY")
de = os.getenv("DE")
batch = int(os.getenv("BATCH"))
 
if proxy is None:
    proxies = None
else:
    proxies = {"http": proxy, "https": proxy}
 
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
# Create a file handler to save logs to a file
log_file = os.path.join(script_dir, "logfile.log")
file_handler = logging.FileHandler(log_file)
 
# Set the logging level for the file handler (if different from the root logger)
file_handler.setLevel(logging.INFO)  # Adjust the log level if needed
 
# Create a formatter and add it to the file handler
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
 
# Add the file handler to the logger
logger.addHandler(file_handler)
 
 
def activate_window(window_title):
    if de == "Xvfb":
        firefox_exists = False
        for proc in psutil.process_iter():
            try:
                if "firefox" in proc.name().lower():
                    firefox_exists = True
                    break
            except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                pass
        if firefox_exists is False:
            subprocess.Popen(
                [
                    "/home/centos/fred/programs/firefox/firefox",
                    "-width",
                    "1350",
                    "-height",
                    "764",
                ]
            )
            logger.info("firefox started")
            time.sleep(10)
    elif de == "Xfce":
        command = f"xdotool search --onlyvisible --name '{window_title}' windowactivate"
        subprocess.run(command, shell=True)
 
 
def wait_for_image(image_path, timeout=10):
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            location = pyautogui.locateOnScreen(
                image_path, region=(0, 0, 1360, 760), confidence=0.9, grayscale=True
            )
            if location:
                return location
        except pyautogui.ImageNotFoundException:
            pass
        time.sleep(1.5)
    return None
 
 
def wait_for_images(image_path, timeout=10):
    start_time = time.time()
    while time.time() - start_time < timeout:
        try:
            location = pyautogui.locateAllOnScreen(
                image_path, region=(0, 0, 1366, 760), confidence=0.9
            )
            if location:
                return list(location)
        except pyautogui.ImageNotFoundException:
            pass
        except pyscreeze.ImageNotFoundException:
            pass
        time.sleep(1.5)
    return []
 
def right_click():
    pyautogui.mouseDown(button="right"# Perform a right-click
    time.sleep(0.1# Adjust the delay if needed (time in seconds)
    pyautogui.mouseUp(button="right"# Release the right-click
 
 
def triple_click(x, y):
    for _ in range(3):
        pyautogui.click(x, y)
        time.sleep(0.1)
 
def get_data(obj):
    start_time = time.time()
    time.sleep(0.5)
    activate_window("Mozilla Firefox")
    time.sleep(0.5)
    # load webpage
    url_profile = obj["contact_name_href"]
    if url_profile.find("?") != -1:
        url_profile = url_profile[: url_profile.find("?")]
    logger.info(url_profile)
    # open a new tab
    # pyautogui.hotkey('ctrl', 't')
    pyautogui.hotkey("ctrl", "l")
    time.sleep(0.5)
    pyautogui.typewrite(url_profile)
    time.sleep(0.5)
    pyautogui.press("enter")
    time.sleep(8)
 
    # check page loaded
    image_path = os.path.join(script_dir, "img/ContactDetails.png")
    loaded = wait_for_image(image_path, 40)
 
    # Box(left=535, top=541, width=74, height=19)
    if loaded:
        logger.info("Page loaded")
 
    # LinkedIn
    image_path = os.path.join(script_dir, "img/LinkedIn.png")
    l = wait_for_images(image_path, 3)
    l_index = 0
    retry = 0
    temp_l = None
    while l_index < len(l):
        logger.info(f"l_index:{l_index+1}/{len(l)}")
        if temp_l is not None:
            if temp_l[0] == l[l_index][0]:
                l_index = l_index + 1
                continue
        li = l[l_index]
        pyautogui.moveTo(li, duration=2, tween=pyautogui.easeInOutQuad)
        time.sleep(0.5)
        pyautogui.click(li)
        time.sleep(2)
        pyautogui.hotkey("ctrl", "l")
        time.sleep(0.5)
        pyautogui.hotkey("ctrl", "c")
        time.sleep(0.5)
        pyautogui.hotkey("ctrl", "w")
        time.sleep(0.5)
        copied_text = pyperclip.paste()
        url = copied_text.strip()
        url = get_link_from_redirect(url)
 
        if url is not None and url != "":
            # logger.info(len(url))
            if url.find("linkedin") == -1:
                logger.info("not found 'linkedin'")
                if retry < 3:
                    retry = retry + 1
                else:
                    l_index = l_index + 1
                continue
 
            # personal LindedIn
            if li[0] < loaded_x_left:
                obj["LinkedIn_Personal_URL"] = url
            else:
                obj["LinkedIn_URL"] = url
        temp_l = li
        l_index = l_index + 1
 
    # Supplemental_Email
    image_path = os.path.join(script_dir, "img/Supplemental.png")
    l = wait_for_image(image_path, 3)
    if l:
        pyautogui.moveTo(
            l[0] + l[2] + 50 + random.randint(-5, 5),
            l[1] + l[3] / 2,
            duration=1,
            tween=pyautogui.easeInOutQuad,
        )
        time.sleep(1)
        right_click()
        time.sleep(1)
        pyautogui.hotkey("l")
        time.sleep(1)
        pyautogui.hotkey("esc")
        # eric@phillips.enterprises
        copied_text = pyperclip.paste()
        obj["Supplemental_Email"] = copied_text.strip()
 
    # Local address
    image_path = os.path.join(script_dir, "img/Local.png")
    l = wait_for_image(image_path, 3)
    if l:
        x = l[0] + l[2] + 30 + random.randint(-5, 5)
        pyautogui.moveTo(x, l[1] + l[3] / 2, duration=1, tween=pyautogui.easeInOutQuad)
        time.sleep(1)
        triple_click(x, l[1] + l[3] / 2)
        time.sleep(1)
        pyautogui.hotkey("ctrl", "c")
        time.sleep(1)
        # eric@phillips.enterprises
        copied_text = pyperclip.paste()
        obj["Local_Location_Address"] = copied_text.strip()
 
def main(max_id):
    failed_list = []
    for i in range(batch):
        logger.info(f"batch:{i+1}/{batch}")
        obj = get_job()
        if obj is None:
            continue
        if obj["contact_name_href"] == "" or obj["contact_name_href"] is None:
            res = post_job(obj)
            logger.info(res)
            continue
 
        if obj["id"] > max_id:
            logger.info("max_id reached")
            continue
        obj = get_data(obj)
        if obj["message"] != "success":
            if obj["id"] in failed_list:
                res = post_job(obj)
                logger.info(res)
            else:
                failed_list.append(obj["id"])
            continue
 
        res = post_job(obj)
        logger.info(res)
 
if __name__ == "__main__":
    # time.sleep(10)
    # countdown
    for i in range(6):
        logger.info(f"countdown:{6-i}")
        time.sleep(1)
 
    # get max id from command
    if len(sys.argv) > 1:
        max_id = int(sys.argv[1])
        logger.info(f"max_id:{max_id}")
    else:
        max_id = 9999999
 
    main(max_id)

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *