網(wǎng)站功能定制優(yōu)化手機流暢度的軟件
WebScraper 工具類使用筆記:靈活易用的爬蟲框架
序言:
安裝好對應(yīng)插件,驅(qū)動
pip install selenium webdriver-manager
1. 類名:WebScraper
這個工具類封裝了瀏覽器控制、頁面交互和數(shù)據(jù)提取的核心功能,旨在提供一個靈活且易于使用的爬蟲框架。
2. 初始化方法
__init__(browser_type="chrome", headless=True, user_agent=None, proxy=None, timeout=30, debug=False)
-
功能:初始化爬蟲實例,配置瀏覽器和開發(fā)工具
-
參數(shù)
:
browser_type
:瀏覽器類型,可選值:“chrome”, “firefox”, “edge”headless
:是否以無頭模式運行瀏覽器user_agent
:自定義 User-Agent 字符串proxy
:代理服務(wù)器配置,格式:{"http": "http://proxy.example.com:8080", "https": "http://proxy.example.com:8080"}
timeout
:操作超時時間(秒)debug
:是否開啟調(diào)試模式
3. 瀏覽器控制方法
open_url(url)
-
功能:打開指定 URL
-
參數(shù)
:
url
:目標 URL
-
返回:頁面加載完成狀態(tài)
close()
- 功能:關(guān)閉瀏覽器實例
- 參數(shù):無
refresh()
- 功能:刷新當前頁面
- 參數(shù):無
go_back()
- 功能:返回上一頁
- 參數(shù):無
4. 元素定位與交互方法
find_element(selector, by="css", timeout=None)
-
功能:查找單個元素
-
參數(shù)
:
selector
:選擇器字符串by
:選擇器類型,可選值:“css”, “xpath”, “id”, “class”, “name”, “l(fā)ink_text”, “partial_link_text”, “tag_name”timeout
:等待元素出現(xiàn)的超時時間(秒)
-
返回:找到的元素對象或 None
find_elements(selector, by="css", timeout=None)
- 功能:查找多個元素
- 參數(shù):同
find_element
- 返回:找到的元素列表
click(element=None, selector=None, by="css", timeout=None)
-
功能:點擊元素
-
參數(shù)
:
element
:元素對象(優(yōu)先使用)selector
:選擇器字符串(當 element 為 None 時使用)by
:選擇器類型timeout
:等待元素出現(xiàn)的超時時間
-
返回:操作結(jié)果
type_text(text, element=None, selector=None, by="css", timeout=None, clear_first=True)
-
功能:在輸入框中輸入文本
-
參數(shù)
:
text
:要輸入的文本element
:元素對象(優(yōu)先使用)selector
:選擇器字符串(當 element 為 None 時使用)by
:選擇器類型timeout
:等待元素出現(xiàn)的超時時間clear_first
:是否先清空輸入框
-
返回:操作結(jié)果
5. 滾動方法
scroll(direction="down", amount=None, element=None, smooth=True, duration=0.5)
-
功能:滾動頁面或元素
-
參數(shù)
:
direction
:滾動方向,可選值:“up”, “down”, “l(fā)eft”, “right”amount
:滾動量(像素),默認為頁面高度 / 寬度的 50%element
:要滾動的元素,默認為整個頁面smooth
:是否平滑滾動duration
:滾動持續(xù)時間(秒)
-
返回:操作結(jié)果
scroll_to_element(element=None, selector=None, by="css", timeout=None, align="center")
-
功能:滾動到指定元素
-
參數(shù)
:
element
:元素對象(優(yōu)先使用)selector
:選擇器字符串(當 element 為 None 時使用)by
:選擇器類型timeout
:等待元素出現(xiàn)的超時時間align
:元素對齊方式,可選值:“top”, “center”, “bottom”
-
返回:操作結(jié)果
scroll_to_bottom(element=None, steps=10, delay=0.5)
-
功能:滾動到頁面或元素底部
-
參數(shù)
:
element
:要滾動的元素,默認為整個頁面steps
:滾動步數(shù)delay
:每步之間的延遲(秒)
-
返回:操作結(jié)果
6. 翻頁方法
next_page(selector=None, method="click", url_template=None, page_param="page", next_page_func=None)
-
功能:翻到下一頁
-
參數(shù)
:
selector
:下一頁按鈕的選擇器(當 method 為 “click” 時使用)method
:翻頁方法,可選值:“click”, “url”, “function”url_template
:URL 模板(當 method 為 “url” 時使用)page_param
:頁碼參數(shù)名(當 method 為 “url” 時使用)next_page_func
:自定義翻頁函數(shù)(當 method 為 “function” 時使用)
-
返回:翻頁是否成功
has_next_page(selector=None, check_func=None)
-
功能:檢查是否有下一頁
-
參數(shù)
:
selector
:下一頁按鈕的選擇器check_func
:自定義檢查函數(shù)
-
返回:布爾值,表示是否有下一頁
set_page(page_num, url_template=None, page_param="page")
-
功能:跳轉(zhuǎn)到指定頁碼
-
參數(shù)
:
page_num
:目標頁碼url_template
:URL 模板page_param
:頁碼參數(shù)名
-
返回:操作結(jié)果
7. 數(shù)據(jù)提取方法
get_text(element=None, selector=None, by="css", timeout=None)
- 功能:獲取元素的文本內(nèi)容
- 參數(shù):同
find_element
- 返回:文本內(nèi)容或 None
get_attribute(attribute, element=None, selector=None, by="css", timeout=None)
-
功能:獲取元素的屬性值
-
參數(shù)
:
attribute
:屬性名- 其他參數(shù)同
find_element
-
返回:屬性值或 None
extract_data(template)
-
功能:根據(jù)模板提取頁面數(shù)據(jù)
-
參數(shù)
:
template
:數(shù)據(jù)提取模板,格式為字典,鍵為數(shù)據(jù)字段名,值為選擇器或提取函數(shù)
-
返回:提取的數(shù)據(jù)
8. DevTools 方法
start_capturing_network()
- 功能:開始捕獲網(wǎng)絡(luò)請求
- 參數(shù):無
stop_capturing_network()
- 功能:停止捕獲網(wǎng)絡(luò)請求
- 參數(shù):無
get_captured_requests(filter_type=None, url_pattern=None)
-
功能:獲取捕獲的網(wǎng)絡(luò)請求
-
參數(shù)
:
filter_type
:請求類型過濾,可選值:“xhr”, “fetch”, “script”, “image”, “stylesheet” 等url_pattern
:URL 模式過濾,支持正則表達式
-
返回:符合條件的請求列表
add_request_interceptor(pattern, handler_func)
-
功能:添加請求攔截器
-
參數(shù)
:
pattern
:URL 匹配模式handler_func
:處理函數(shù),接收請求對象,可修改請求或返回自定義響應(yīng)
-
返回:攔截器 ID
9. 輔助方法
wait_for_element(selector, by="css", timeout=None, condition="visible")
-
功能:等待元素滿足特定條件
-
參數(shù)
:
selector
:選擇器字符串by
:選擇器類型timeout
:超時時間condition
:等待條件,可選值:“visible”, “present”, “clickable”, “invisible”, “not_present”
-
返回:元素對象或 None
execute_script(script, *args)
-
功能:執(zhí)行 JavaScript 代碼
-
參數(shù)
:
script
:JavaScript 代碼*args
:傳遞給 JavaScript 的參數(shù)
-
返回:JavaScript 執(zhí)行結(jié)果
set_delay(min_delay, max_delay=None)
-
功能:設(shè)置操作之間的隨機延遲
-
參數(shù)
:
min_delay
:最小延遲時間(秒)max_delay
:最大延遲時間(秒),如果為 None 則固定為 min_delay
-
返回:無
take_screenshot(path=None)
-
功能:截取當前頁面截圖
-
參數(shù)
:
path
:保存路徑,如果為 None 則返回圖像數(shù)據(jù)
-
返回:如果 path 為 None,返回圖像二進制數(shù)據(jù);否則返回保存結(jié)果
10. 代碼實現(xiàn)部分
python
import time
import random
import json
import re
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException
from selenium.webdriver.common.action_chains import ActionChains# Optional: For easier driver management
try:from webdriver_manager.chrome import ChromeDriverManagerfrom webdriver_manager.firefox import GeckoDriverManagerfrom webdriver_manager.microsoft import EdgeChromiumDriverManagerWEBDRIVER_MANAGER_AVAILABLE = True
except ImportError:WEBDRIVER_MANAGER_AVAILABLE = Falseprint("Consider installing webdriver-manager for easier driver setup: pip install webdriver-manager")class WebScraper:_BY_MAP = {"css": By.CSS_SELECTOR,"xpath": By.XPATH,"id": By.ID,"class": By.CLASS_NAME, # Note: find by class name only works for a single class"name": By.NAME,"link_text": By.LINK_TEXT,"partial_link_text": By.PARTIAL_LINK_TEXT,"tag_name": By.TAG_NAME,}def __init__(self, browser_type="chrome", headless=True, user_agent=None, proxy=None, timeout=30, debug=False):self.browser_type = browser_type.lower()self.headless = headlessself.user_agent = user_agentself.proxy = proxyself.timeout = timeoutself.debug = debugself.driver = Noneself.current_page_num = 1 # For URL-based paginationself._min_delay = 0.5self._max_delay = 1.5self._network_requests_raw = [] # To store JS collected network entriesself._setup_driver()def _print_debug(self, message):if self.debug:print(f"[DEBUG] {message}")def _setup_driver(self):self._print_debug(f"Setting up {self.browser_type} browser...")options = Noneservice = Noneif self.browser_type == "chrome":options = webdriver.ChromeOptions()if self.user_agent:options.add_argument(f"user-agent={self.user_agent}")if self.headless:options.add_argument("--headless")options.add_argument("--window-size=1920x1080") # Often needed for headlessif self.proxy:if "http" in self.proxy: # Basic proxy, for more auth use selenium-wireoptions.add_argument(f"--proxy-server={self.proxy['http']}")elif "https" in self.proxy: # Selenium typically uses one proxy for alloptions.add_argument(f"--proxy-server={self.proxy['https']}")options.add_argument("--disable-gpu")options.add_argument("--no-sandbox")options.add_argument("--disable-dev-shm-usage") # Overcome limited resource problemsif WEBDRIVER_MANAGER_AVAILABLE:try:service = webdriver.chrome.service.Service(ChromeDriverManager().install())self.driver = webdriver.Chrome(service=service, options=options)except Exception as e:self._print_debug(f"WebDriverManager for Chrome failed: {e}. Falling back to default PATH.")self.driver = webdriver.Chrome(options=options) # Fallback to PATHelse:self.driver = webdriver.Chrome(options=options)elif self.browser_type == "firefox":options = webdriver.FirefoxOptions()if self.user_agent:options.set_preference("general.useragent.override", self.user_agent)if self.headless:options.add_argument("--headless")if self.proxy:# Firefox proxy setup is more involved via preferencesif "http" in self.proxy:host, port = self.proxy['http'].replace('http://', '').split(':')options.set_preference("network.proxy.type", 1)options.set_preference("network.proxy.http", host)options.set_preference("network.proxy.http_port", int(port))if "https" in self.proxy: # Assuming same proxy for httpshost, port = self.proxy['https'].replace('https://', '').split(':')options.set_preference("network.proxy.ssl", host)options.set_preference("network.proxy.ssl_port", int(port))# options.set_preference("network.proxy.share_proxy_settings", True) # if one proxy for allif WEBDRIVER_MANAGER_AVAILABLE:try:service = webdriver.firefox.service.Service(GeckoDriverManager().install())self.driver = webdriver.Firefox(service=service, options=options)except Exception as e:self._print_debug(f"WebDriverManager for Firefox failed: {e}. Falling back to default PATH.")self.driver = webdriver.Firefox(options=options)else:self.driver = webdriver.Firefox(options=options)elif self.browser_type == "edge":options = webdriver.EdgeOptions()if self.user_agent:options.add_argument(f"user-agent={self.user_agent}")if self.headless:options.add_argument("--headless") # Edge uses Chromium engineoptions.add_argument("--window-size=1920x1080")if self.proxy and "http" in self.proxy: # Basic proxyoptions.add_argument(f"--proxy-server={self.proxy['http']}")options.add_argument("--disable-gpu")if WEBDRIVER_MANAGER_AVAILABLE:try:service = webdriver.edge.service.Service(EdgeChromiumDriverManager().install())self.driver = webdriver.Edge(service=service, options=options)except Exception as e:self._print_debug(f"WebDriverManager for Edge failed: {e}. Falling back to default PATH.")self.driver = webdriver.Edge(options=options)else:self.driver = webdriver.Edge(options=options)else:raise ValueError(f"Unsupported browser: {self.browser_type}")self.driver.implicitly_wait(self.timeout / 2) # Implicit wait for elementsself.driver.set_page_load_timeout(self.timeout)self._print_debug(f"{self.browser_type} browser setup complete.")def _get_selenium_by(self, by_string):by_string = by_string.lower()if by_string not in self._BY_MAP:raise ValueError(f"Invalid selector type: {by_string}. Supported: {list(self._BY_MAP.keys())}")return self._BY_MAP[by_string]def _perform_delay(self):time.sleep(random.uniform(self._min_delay, self._max_delay))# --- Browser Control ---def open_url(self, url):self._print_debug(f"Opening URL: {url}")try:self.driver.get(url)self._perform_delay()# A simple check, for true "loaded" status, might need to wait for specific elementreturn self.driver.execute_script("return document.readyState") == "complete"except WebDriverException as e:self._print_debug(f"Error opening URL {url}: {e}")return Falsedef close(self):if self.driver:self._print_debug("Closing browser.")self.driver.quit()self.driver = Nonedef refresh(self):self._print_debug("Refreshing page.")self.driver.refresh()self._perform_delay()def go_back(self):self._print_debug("Going back to previous page.")self.driver.back()self._perform_delay()# --- Element Location & Interaction ---def find_element(self, selector, by="css", timeout=None):wait_timeout = timeout if timeout is not None else self.timeoutself._print_debug(f"Finding element by {by}: '{selector}' with timeout {wait_timeout}s")try:wait = WebDriverWait(self.driver, wait_timeout)element = wait.until(EC.presence_of_element_located((self._get_selenium_by(by), selector)))return elementexcept TimeoutException:self._print_debug(f"Element not found by {by}: '{selector}' within {wait_timeout}s.")return Noneexcept Exception as e:self._print_debug(f"Error finding element by {by}: '{selector}': {e}")return Nonedef find_elements(self, selector, by="css", timeout=None):wait_timeout = timeout if timeout is not None else self.timeoutself._print_debug(f"Finding elements by {by}: '{selector}' with timeout {wait_timeout}s")try:# Wait for at least one element to be present to ensure page readinessWebDriverWait(self.driver, wait_timeout).until(EC.presence_of_all_elements_located((self._get_selenium_by(by), selector)))# Then find all elements without further explicit wait beyond implicitreturn self.driver.find_elements(self._get_selenium_by(by), selector)except TimeoutException:self._print_debug(f"No elements found by {by}: '{selector}' within {wait_timeout}s.")return []except Exception as e:self._print_debug(f"Error finding elements by {by}: '{selector}': {e}")return []def click(self, element=None, selector=None, by="css", timeout=None):if not element and selector:element = self.wait_for_element(selector, by, timeout, condition="clickable")if element:try:self._print_debug(f"Clicking element: {element.tag_name} (selector: {selector})")# Try JavaScript click if standard click is interceptedtry:element.click()except WebDriverException: # e.g. ElementClickInterceptedExceptionself._print_debug("Standard click failed, trying JavaScript click.")self.driver.execute_script("arguments[0].click();", element)self._perform_delay()return Trueexcept Exception as e:self._print_debug(f"Error clicking element: {e}")return Falseself._print_debug("Element not provided or not found for click.")return Falsedef type_text(self, text, element=None, selector=None, by="css", timeout=None, clear_first=True):if not element and selector:element = self.wait_for_element(selector, by, timeout, condition="visible")if element:try:self._print_debug(f"Typing text '{text}' into element: {element.tag_name} (selector: {selector})")if clear_first:element.clear()element.send_keys(text)self._perform_delay()return Trueexcept Exception as e:self._print_debug(f"Error typing text: {e}")return Falseself._print_debug("Element not provided or not found for typing.")return False# --- Scrolling Methods ---def scroll(self, direction="down", amount=None, element=None, smooth=True, duration=0.5):self._print_debug(f"Scrolling {direction}...")script = ""target = "window"if element:target = "arguments[0]" # Element will be passed as arguments[0]behavior = "smooth" if smooth else "auto"if direction == "down":scroll_val = amount if amount is not None else f"{target}.innerHeight / 2" if element else "window.innerHeight / 2"script = f"{target}.scrollBy({{ top: {scroll_val}, left: 0, behavior: '{behavior}' }});"elif direction == "up":scroll_val = amount if amount is not None else f"{target}.innerHeight / 2" if element else "window.innerHeight / 2"script = f"{target}.scrollBy({{ top: -{scroll_val}, left: 0, behavior: '{behavior}' }});"elif direction == "left":scroll_val = amount if amount is not None else f"{target}.innerWidth / 2" if element else "window.innerWidth / 2"script = f"{target}.scrollBy({{ top: 0, left: -{scroll_val}, behavior: '{behavior}' }});"elif direction == "right":scroll_val = amount if amount is not None else f"{target}.innerWidth / 2" if element else "window.innerWidth / 2"script = f"{target}.scrollBy({{ top: 0, left: {scroll_val}, behavior: '{behavior}' }});"else:self._print_debug(f"Invalid scroll direction: {direction}")return Falsetry:if element:self.driver.execute_script(script, element)else:self.driver.execute_script(script)time.sleep(duration) # Allow time for smooth scroll to completereturn Trueexcept Exception as e:self._print_debug(f"Error during scroll: {e}")return Falsedef scroll_to_element(self, element=None, selector=None, by="css", timeout=None, align="center"):if not element and selector:element = self.find_element(selector, by, timeout)if element:self._print_debug(f"Scrolling to element (selector: {selector}) with align: {align}")try:# 'block' can be 'start', 'center', 'end', or 'nearest'.# 'inline' is similar for horizontal.# For simplicity, map to 'block' options.align_js = "{ behavior: 'smooth', block: 'center', inline: 'nearest' }"if align == "top":align_js = "{ behavior: 'smooth', block: 'start', inline: 'nearest' }"elif align == "bottom":align_js = "{ behavior: 'smooth', block: 'end', inline: 'nearest' }"self.driver.execute_script(f"arguments[0].scrollIntoView({align_js});", element)self._perform_delay() # Give it a moment to scrollreturn Trueexcept Exception as e:self._print_debug(f"Error scrolling to element: {e}")return Falseself._print_debug("Element not provided or not found for scroll_to_element.")return Falsedef scroll_to_bottom(self, element=None, steps=10, delay=0.5):self._print_debug("Scrolling to bottom...")target = "document.body"target_el_for_js = Noneif element:target = "arguments[0]"target_el_for_js = elementtry:last_height_script = f"return {target}.scrollHeight"scroll_script = f"{target}.scrollTop = {target}.scrollHeight;"for _ in range(steps):if target_el_for_js:last_height = self.driver.execute_script(last_height_script, target_el_for_js)self.driver.execute_script(scroll_script, target_el_for_js)else:last_height = self.driver.execute_script(last_height_script)self.driver.execute_script(scroll_script)time.sleep(delay)if target_el_for_js:new_height = self.driver.execute_script(last_height_script, target_el_for_js)else:new_height = self.driver.execute_script(last_height_script)if new_height == last_height: # Reached bottom or no more content loadedbreakself._print_debug("Scrolled to bottom (or no more content loaded).")return Trueexcept Exception as e:self._print_debug(f"Error scrolling to bottom: {e}")return False# --- Pagination Methods ---def next_page(self, selector=None, method="click", url_template=None, page_param="page", next_page_func=None):self._print_debug(f"Attempting to go to next page using method: {method}")if method == "click":if not selector:self._print_debug("Selector for next page button is required for 'click' method.")return Falsenext_button = self.wait_for_element(selector, condition="clickable")if next_button:return self.click(element=next_button)else:self._print_debug("Next page button not found or not clickable.")return Falseelif method == "url":if not url_template:self._print_debug("URL template is required for 'url' method.")return Falseself.current_page_num += 1next_url = url_template.replace(f"{{{page_param}}}", str(self.current_page_num))return self.open_url(next_url)elif method == "function":if not callable(next_page_func):self._print_debug("A callable function is required for 'function' method.")return Falsetry:return next_page_func(self) # Pass scraper instance to the custom functionexcept Exception as e:self._print_debug(f"Custom next_page_func failed: {e}")return Falseelse:self._print_debug(f"Invalid pagination method: {method}")return Falsedef has_next_page(self, selector=None, check_func=None):self._print_debug("Checking for next page...")if callable(check_func):try:return check_func(self)except Exception as e:self._print_debug(f"Custom check_func for has_next_page failed: {e}")return Falseelif selector:# Check if element is present and often, if it's not disabledelement = self.find_element(selector)if element:is_disabled = element.get_attribute("disabled")class_attr = element.get_attribute("class")# Common patterns for disabled buttonsif is_disabled or (class_attr and ("disabled" in class_attr or "inactive" in class_attr)):self._print_debug("Next page element found but appears disabled.")return Falsereturn Truereturn Falseself._print_debug("No selector or check_func provided for has_next_page.")return False # Default to no next page if insufficient infodef set_page(self, page_num, url_template=None, page_param="page"):if not url_template:self._print_debug("URL template is required for set_page.")return Falseself._print_debug(f"Setting page to: {page_num}")self.current_page_num = page_numtarget_url = url_template.replace(f"{{{page_param}}}", str(page_num))return self.open_url(target_url)# --- Data Extraction Methods ---def get_text(self, element=None, selector=None, by="css", timeout=None):if not element and selector:element = self.find_element(selector, by, timeout)if element:try:text = element.textself._print_debug(f"Extracted text: '{text[:50]}...' from element (selector: {selector})")return textexcept Exception as e:self._print_debug(f"Error getting text: {e}")return Noneself._print_debug("Element not provided or not found for get_text.")return Nonedef get_attribute(self, attribute, element=None, selector=None, by="css", timeout=None):if not element and selector:element = self.find_element(selector, by, timeout)if element:try:value = element.get_attribute(attribute)self._print_debug(f"Extracted attribute '{attribute}': '{value}' from element (selector: {selector})")return valueexcept Exception as e:self._print_debug(f"Error getting attribute '{attribute}': {e}")return Noneself._print_debug("Element not provided or not found for get_attribute.")return Nonedef extract_data(self, template):"""Extracts data based on a template.Template format: {"field_name": "css_selector" or ("css_selector", "attribute_name") or callable}If callable, it receives the scraper instance (self) and the parent_element (if any).To extract multiple items (e.g., a list), the selector should point to the parent of those items,and the callable should handle finding and processing sub-elements.Or, the template value can be a list of sub-templates for structured data.For simplicity here, we assume template values are selectors for single items,or callables for custom logic."""self._print_debug(f"Extracting data with template: {template}")extracted_data = {}for field_name, rule in template.items():value = Nonetry:if isinstance(rule, str): # Simple CSS selector for textvalue = self.get_text(selector=rule)elif isinstance(rule, tuple) and len(rule) == 2: # (selector, attribute)value = self.get_attribute(selector=rule[0], attribute=rule[1])elif callable(rule): # Custom extraction functionvalue = rule(self) # Pass scraper instanceelse:self._print_debug(f"Invalid rule for field '{field_name}': {rule}")extracted_data[field_name] = valueexcept Exception as e:self._print_debug(f"Error extracting field '{field_name}' with rule '{rule}': {e}")extracted_data[field_name] = Nonereturn extracted_data# --- DevTools Methods (Limited by standard Selenium) ---def start_capturing_network(self):"""Clears previously captured network requests (from JS).Actual continuous network capture requires selenium-wire or browser's DevTools Protocol."""self._print_debug("Starting network capture (clearing previous JS logs).")self._network_requests_raw = []# Note: This doesn't actively "start" a capture process in the browser's network panel.# It just prepares our internal list for new entries gathered by get_captured_requests.def stop_capturing_network(self):"""Conceptually stops. With JS method, it means new calls to get_captured_requestswill include data up to this point, but nothing explicitly 'stops' in the browser."""self._print_debug("Stopping network capture (conceptual for JS method).")# No direct action for JS based capture, it's always available.def get_captured_requests(self, filter_type=None, url_pattern=None):"""Gets network requests using JavaScript performance API. This is a snapshot.filter_type: e.g., 'script', 'img', 'css', 'xmlhttprequest', 'fetch'url_pattern: Regex string to filter URLs."""self._print_debug("Getting captured network requests via JavaScript Performance API.")try:# Get all resource timing entriescurrent_entries = self.driver.execute_script("return window.performance.getEntriesByType('resource');")if isinstance(current_entries, list):self._network_requests_raw.extend(current_entries) # Append new ones# Deduplicate based on 'name' (URL) and 'startTime' to keep it somewhat manageableseen = set()deduplicated_requests = []for entry in sorted(self._network_requests_raw, key=lambda x: x.get('startTime', 0)):identifier = (entry.get('name'), entry.get('startTime'))if identifier not in seen:deduplicated_requests.append(entry)seen.add(identifier)self._network_requests_raw = deduplicated_requestsfiltered_requests = []for req in self._network_requests_raw:# req is a dictionary like:# {'name': url, 'entryType': 'resource', 'startTime': 123.45, 'duration': 67.89,# 'initiatorType': 'script'/'img'/'css'/'link'/'xmlhttprequest', etc.}if filter_type:# initiatorType is more reliable for filtering than entryType (always 'resource')initiator = req.get('initiatorType', '').lower()if filter_type.lower() == "xhr": # Common aliasif initiator != 'xmlhttprequest': continueelif filter_type.lower() not in initiator:continueif url_pattern:if not re.search(url_pattern, req.get('name', '')):continuefiltered_requests.append(req)self._print_debug(f"Found {len(filtered_requests)} filtered network requests.")return filtered_requestsexcept WebDriverException as e:self._print_debug(f"Error getting network requests via JS: {e}")return []def add_request_interceptor(self, pattern, handler_func):"""NOTE: True request interception is NOT reliably possible with standard Selenium.This requires tools like SeleniumWire or direct DevTools Protocol interaction,which are more complex to set up and manage.This method is a placeholder to acknowledge the design spec."""self._print_debug("WARNING: add_request_interceptor is not implemented with standard Selenium. ""Consider using SeleniumWire for this functionality.")# To make it "runnable" without error, return a dummy IDreturn f"dummy_interceptor_id_{pattern}"# --- Auxiliary Methods ---def wait_for_element(self, selector, by="css", timeout=None, condition="visible"):wait_timeout = timeout if timeout is not None else self.timeoutself._print_debug(f"Waiting for element by {by}: '{selector}' to be {condition} (timeout: {wait_timeout}s)")try:wait = WebDriverWait(self.driver, wait_timeout)sel_by = self._get_selenium_by(by)if condition == "visible":element = wait.until(EC.visibility_of_element_located((sel_by, selector)))elif condition == "present":element = wait.until(EC.presence_of_element_located((sel_by, selector)))elif condition == "clickable":element = wait.until(EC.element_to_be_clickable((sel_by, selector)))elif condition == "invisible":# Returns True if invisible, or an element if it becomes invisible (less common use)# For our purpose, we want the element if it exists and is invisible, or None if it becomes visible/not found# This is tricky. A simpler approach is to check if it's NOT visible.# Let's wait for presence, then check visibility.present_element = wait.until(EC.presence_of_element_located((sel_by, selector)))if not present_element.is_displayed():element = present_elementelse: # Element is present AND visible, so condition "invisible" is falseraise TimeoutException(f"Element '{selector}' was visible, not invisible.")elif condition == "not_present":# Returns True if element is not present, or raises TimeoutException# This doesn't return the element. We signal success by returning a dummy True# or failure by returning None after timeout.if wait.until(EC.invisibility_of_element_located((sel_by, selector))): # Waits for staleness or non-presenceself._print_debug(f"Element by {by}: '{selector}' confirmed not present or invisible.")return True # Indicates success for this condition, though no element is returnedelse: # Should not happen if invisibility_of_element_located works as expectedreturn Noneelse:raise ValueError(f"Unsupported condition: {condition}")return elementexcept TimeoutException:self._print_debug(f"Element by {by}: '{selector}' did not meet condition '{condition}' within {wait_timeout}s.")return Noneexcept Exception as e:self._print_debug(f"Error waiting for element '{selector}' condition '{condition}': {e}")return Nonedef execute_script(self, script, *args):self._print_debug(f"Executing script: {script[:100]}...")try:return self.driver.execute_script(script, *args)except WebDriverException as e:self._print_debug(f"Error executing script: {e}")return Nonedef set_delay(self, min_delay, max_delay=None):self._print_debug(f"Setting delay: min={min_delay}, max={max_delay}")self._min_delay = min_delayself._max_delay = max_delay if max_delay is not None else min_delaydef take_screenshot(self, path=None):self._print_debug(f"Taking screenshot. Path: {path if path else 'Return as PNG data'}")try:if path:return self.driver.save_screenshot(path) # Returns True on successelse:return self.driver.get_screenshot_as_png() # Returns binary dataexcept WebDriverException as e:self._print_debug(f"Error taking screenshot: {e}")return None if path is None else Falsedef __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.close()
11. 案例應(yīng)用部分
python
from WebScraper import WebScraper
import time
import jsondef main():# 初始化WebScraper實例(非無頭模式,便于觀察)scraper = WebScraper(browser_type="chrome",headless=False,timeout=15,debug=True)try:# 1. 打開百度首頁baidu_url = "https://www.baidu.com"print("正在打開百度首頁...")if not scraper.open_url(baidu_url):print("百度首頁打開失敗!")return# 2. 輸入搜索關(guān)鍵詞并執(zhí)行搜索search_keyword = "人工智能發(fā)展趨勢" # 可以修改為任意搜索關(guān)鍵詞print(f"正在搜索: {search_keyword}")# 定位搜索框并輸入關(guān)鍵詞search_input = scraper.find_element(selector="#kw", by="css")if not search_input:print("未找到搜索框元素!")returnscraper.type_text(text=search_keyword, element=search_input)# 點擊搜索按鈕if not scraper.click(selector="#su", by="css"):print("點擊搜索按鈕失敗!")return# 等待搜索結(jié)果加載time.sleep(2)print("搜索結(jié)果加載中...")# 3. 滾動到頁面底部print("正在滾動到頁面底部...")if scraper.scroll_to_bottom(steps=10, delay=0.1):print("已滾動到頁面底部")else:print("滾動到頁面底部失敗")# 4. 提取當前頁面的搜索結(jié)果標題print("正在提取當前頁面的搜索結(jié)果...")result_titles = scraper.find_elements(selector="h3.t a", by="css")if result_titles:print(f"找到 {len(result_titles)} 個搜索結(jié)果標題:")for i, title in enumerate(result_titles, 1):title_text = title.textprint(f"{i}. {title_text}")else:print("未找到搜索結(jié)果標題")# 5. 翻頁獲取更多搜索結(jié)果(演示前3頁)for page in range(2, 4):print(f"\n正在翻到第 {page} 頁...")# 方法1:使用next_page方法點擊下一頁按鈕next_button_selector = ".n" # 百度下一頁按鈕的CSS選擇器if scraper.next_page(selector=next_button_selector, method="click"):print(f"已翻到第 {page} 頁,等待加載...")time.sleep(2)# 滾動到新頁面底部scraper.scroll_to_bottom(steps=10, delay=0.3)time.sleep(1)# 提取新頁面的搜索結(jié)果result_titles = scraper.find_elements(selector="h3.t a", by="css")if result_titles:print(f"第 {page} 頁找到 {len(result_titles)} 個搜索結(jié)果標題:")for i, title in enumerate(result_titles, 1):title_text = title.textprint(f"{i}. {title_text}")else:print(f"第 {page} 頁未找到搜索結(jié)果標題")else:print(f"翻到第 {page} 頁失敗,可能已到最后一頁")break# 6. 使用extract_data方法提取結(jié)構(gòu)化數(shù)據(jù)print("\n使用extract_data方法提取結(jié)構(gòu)化數(shù)據(jù):")data_template = {"搜索關(guān)鍵詞": search_keyword,"當前時間": lambda scraper: time.strftime("%Y-%m-%d %H:%M:%S"),"頁面標題": "title","當前URL": ("", "href"), # 使用空選擇器獲取當前URL"搜索結(jié)果數(shù)量": lambda scraper: len(scraper.find_elements("h3.t a"))}extracted_data = scraper.extract_data(data_template)for key, value in extracted_data.items():print(f"{key}: {value}")# 7. 保存數(shù)據(jù)到JSON文件with open("baidu_search_results.json", "w", encoding="utf-8") as f:json.dump(extracted_data, f, ensure_ascii=False, indent=2)print("\n數(shù)據(jù)已保存到 baidu_search_results.json")except Exception as e:print(f"操作過程中發(fā)生錯誤: {e}")finally:# 8. 關(guān)閉瀏覽器scraper.close()print("瀏覽器已關(guān)閉")if __name__ == "__main__":main()