Files
kleinanzeigen-bot/kleinanzeigen_bot/selenium_mixin.py
2022-01-30 07:19:52 +01:00

276 lines
13 KiB
Python

"""
Copyright (C) 2022 Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
"""
import logging, os, shutil, sys, tempfile
from typing import Any, Callable, Dict, Final, Iterable, Tuple
from importlib.resources import read_text as get_resource_as_string
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService, DEFAULT_EXECUTEABLE_PATH as DEFAULT_CHROMEDRIVER_PATH
from selenium.webdriver.chromium.webdriver import ChromiumDriver
from selenium.webdriver.edge.service import Service as EdgeService, DEFAULT_EXECUTEABLE_PATH as DEFAULT_EDGEDRIVER_PATH
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import Select, WebDriverWait
import selenium_stealth
import webdriver_manager.utils as ChromeDriverManagerUtils
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.microsoft import EdgeChromiumDriverManager
from webdriver_manager.utils import ChromeType
from .utils import ensure, is_frozen, pause
LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.selenium_mixin")
class SeleniumMixin:
def __init__(self):
self.browser_arguments:Iterable[str] = []
self.browser_binary_location:str = None
self.webdriver:WebDriver = None
def __del__(self):
if getattr(self, 'cacertfile', None):
os.remove(self.cacertfile)
def create_webdriver_session(self) -> None:
LOG.info("Creating WebDriver session...")
def init_browser_options(browser_options):
if isinstance(browser_options, webdriver.EdgeOptions):
browser_options.add_argument("-inprivate")
else:
browser_options.add_argument("--incognito")
browser_options.add_argument("--disable-crash-reporter")
browser_options.add_argument("--no-first-run")
browser_options.add_argument("--no-service-autorun")
for chrome_option in self.browser_arguments:
LOG.info(" -> Custom chrome argument: %s", chrome_option)
browser_options.add_argument(chrome_option)
browser_options.add_experimental_option('excludeSwitches', ['enable-automation'])
browser_options.add_experimental_option('useAutomationExtension', False)
browser_options.add_experimental_option("prefs", {
"credentials_enable_service": False,
"profile.password_manager_enabled": False,
"devtools.preferences.currentDockState": "\"bottom\""
})
if self.browser_binary_location:
browser_options.binary_location = self.browser_binary_location
LOG.info(" -> Chrome binary location: %s", self.browser_binary_location)
return browser_options
# if run via py2exe fix resource lookup
if is_frozen():
import pathlib # pylint: disable=import-outside-toplevel
if not os.getenv("REQUESTS_CA_BUNDLE", None) or not os.path.exists(os.getenv("REQUESTS_CA_BUNDLE", None)):
with tempfile.NamedTemporaryFile(delete = False) as tmp:
LOG.debug("Writing cacert file to [%s]...", tmp.name)
tmp.write(get_resource_as_string("certifi", "cacert.pem").encode('utf-8'))
self.cacertfile = tmp.name
os.environ['REQUESTS_CA_BUNDLE'] = self.cacertfile
read_text_orig = pathlib.Path.read_text
def read_text_new(self, encoding = None, errors = None):
path = str(self)
if "selenium_stealth" in path:
return get_resource_as_string("selenium_stealth", self.name)
return read_text_orig(self, encoding, errors)
pathlib.Path.read_text = read_text_new
# check if a chrome driver is present already
if shutil.which(DEFAULT_CHROMEDRIVER_PATH):
self.webdriver = webdriver.Chrome(options = init_browser_options(webdriver.ChromeOptions()))
elif shutil.which(DEFAULT_EDGEDRIVER_PATH):
self.webdriver = webdriver.ChromiumEdge(options = init_browser_options(webdriver.EdgeOptions()))
else:
# determine browser major version
if self.browser_binary_location:
chrome_type, chrome_version = self.get_browser_version(self.browser_binary_location)
else:
chrome_type, chrome_version = self.get_browser_version_from_os()
chrome_major_version = chrome_version.split(".", 1)[0]
# download and install matching chrome driver
if chrome_type == ChromeType.MSEDGE:
webdriver_mgr = EdgeChromiumDriverManager(cache_valid_range = 14)
webdriver_mgr.driver.browser_version = chrome_major_version
webdriver_path = webdriver_mgr.install()
env = os.environ.copy()
env["MSEDGEDRIVER_TELEMETRY_OPTOUT"] = "1" # https://docs.microsoft.com/en-us/microsoft-edge/privacy-whitepaper/#microsoft-edge-driver
self.webdriver = webdriver.ChromiumEdge(
service = EdgeService(webdriver_path, env = env),
options = init_browser_options(webdriver.EdgeOptions())
)
else:
webdriver_mgr = ChromeDriverManager(chrome_type = chrome_type, cache_valid_range = 14)
webdriver_mgr.driver.browser_version = chrome_major_version
webdriver_path = webdriver_mgr.install()
self.webdriver = webdriver.Chrome(service = ChromeService(webdriver_path), options = init_browser_options(webdriver.ChromeOptions()))
# workaround to support Edge, see https://github.com/diprajpatra/selenium-stealth/pull/25
selenium_stealth.Driver = ChromiumDriver
selenium_stealth.stealth(self.webdriver, # https://github.com/diprajpatra/selenium-stealth#args
languages = ("de-DE", "de", "en-US", "en"),
vendor = "Google Inc.",
platform = "Win32",
webgl_vendor = "Intel Inc.",
renderer = "Intel Iris OpenGL Engine",
fix_hairline = True,
)
LOG.info("New WebDriver session is: %s %s", self.webdriver.session_id, self.webdriver.command_executor._url) # pylint: disable=protected-access
def get_browser_version(self, executable_path: str) -> Tuple[ChromeType, str]:
if sys.platform == "win32":
import win32api # pylint: disable=import-outside-toplevel,import-error
# pylint: disable=no-member
lang, codepage = win32api.GetFileVersionInfo(executable_path, "\\VarFileInfo\\Translation")[0]
product_name = win32api.GetFileVersionInfo(executable_path, f"\\StringFileInfo\\{lang:04X}{codepage:04X}\\ProductName")
product_version = win32api.GetFileVersionInfo(executable_path, f"\\StringFileInfo\\{lang:04X}{codepage:04X}\\ProductVersion")
# pylint: enable=no-member
match product_name:
case "Chromium":
return (ChromeType.CHROMIUM, product_version)
case "Microsoft Edge":
return (ChromeType.MSEDGE, product_version)
case _: # "Google Chrome"
return (ChromeType.GOOGLE, product_version)
if sys.platform.startswith("linux"):
cmd = ChromeDriverManagerUtils.linux_browser_apps_to_cmd(executable_path)
else:
cmd = executable_path + " --version"
version = ChromeDriverManagerUtils.read_version_from_cmd(cmd, r'\d+\.\d+\.\d+')
filename = os.path.basename(executable_path).lower()
if "chromium" in filename:
return (ChromeType.CHROMIUM, version)
if "edge" in filename:
return (ChromeType.MSEDGE, version)
return (ChromeType.GOOGLE, version)
def get_browser_version_from_os(self) -> Tuple[ChromeType, str]:
version = ChromeDriverManagerUtils.get_browser_version_from_os(ChromeType.CHROMIUM)
if version != "UNKNOWN":
return (ChromeType.CHROMIUM, version)
LOG.debug("Chromium not found")
version = ChromeDriverManagerUtils.get_browser_version_from_os(ChromeType.GOOGLE)
if version != "UNKNOWN":
return (ChromeType.GOOGLE, version)
LOG.debug("Google Chrome not found")
version = ChromeDriverManagerUtils.get_browser_version_from_os(ChromeType.MSEDGE)
if version != "UNKNOWN":
return (ChromeType.MSEDGE, version)
LOG.debug("Microsoft Edge not found")
return (None, None)
def web_await(self, condition: Callable[[WebDriver], WebElement], timeout:int = 5) -> WebElement:
"""
:param timeout: timeout in seconds
:raises NoSuchElementException: if element could not be found within time
"""
try:
return WebDriverWait(self.webdriver, timeout).until(condition)
except TimeoutException as ex:
raise NoSuchElementException from ex
def web_click(self, selector_type:By, selector_value:str, timeout:int = 5) -> WebElement:
"""
:param timeout: timeout in seconds
:raises NoSuchElementException: if element could not be found within time
"""
elem = self.web_await(EC.element_to_be_clickable((selector_type, selector_value)), timeout)
elem.click()
pause()
return elem
def web_execute(self, javascript:str) -> Any:
"""
:return: The command's JSON response
"""
return self.webdriver.execute_script(javascript)
def web_find(self, selector_type:By, selector_value:str, timeout:int = 5) -> WebElement:
"""
:param timeout: timeout in seconds
:raises NoSuchElementException: if element could not be found within time
"""
return self.web_await(EC.presence_of_element_located((selector_type, selector_value)), timeout)
def web_input(self, selector_type:By, selector_value:str, text:str, timeout:int = 5) -> WebElement:
"""
:param timeout: timeout in seconds
:raises NoSuchElementException: if element could not be found within time
"""
input_field = self.web_find(selector_type, selector_value, timeout)
input_field.clear()
input_field.send_keys(text)
pause()
def web_open(self, url, timeout = 10, reload_if_already_open = False) -> None:
"""
:param url: url to open in browser
:param timeout: timeout in seconds
:param reload_if_already_open: if False does nothing if the url is already open in the browser
"""
LOG.debug(" -> Opening [%s]...", url)
if not reload_if_already_open and url == self.webdriver.current_url:
LOG.debug(" => skipping, [%s] is already open", url)
return
self.webdriver.get(url)
WebDriverWait(self.webdriver, timeout).until(lambda _: self.web_execute("return document.readyState") == "complete")
# pylint: disable=dangerous-default-value
def web_request(self, url:str, method:str = "GET", valid_response_codes:Iterable[int] = [200], headers:Dict[str, str] = None) -> Dict[str, Any]:
method = method.upper()
LOG.debug(" -> HTTP %s [%s]...", method, url)
response = self.webdriver.execute_async_script(f"""
var callback = arguments[arguments.length - 1];
fetch("{url}", {{
method: "{method}",
redirect: "follow",
headers: {headers or {}}
}})
.then(response => response.text().then(responseText => {{
headers = {{}};
response.headers.forEach((v, k) => headers[k] = v);
callback({{
"statusCode": response.status,
"statusMessage": response.statusText,
"headers": headers,
"content": responseText
}})
}}))
""")
ensure(
response["statusCode"] in valid_response_codes,
f'Invalid response "{response["statusCode"]} response["statusMessage"]" received for HTTP {method} to {url}'
)
return response
# pylint: enable=dangerous-default-value
def web_select(self, selector_type:By, selector_value:str, selected_value:Any, timeout:int = 5) -> WebElement:
"""
:param timeout: timeout in seconds
:raises NoSuchElementException: if element could not be found within time
"""
elem = self.web_await(EC.element_to_be_clickable((selector_type, selector_value)), timeout)
Select(elem).select_by_value(selected_value)
pause()
return elem