This commit is contained in:
sebthom
2024-03-04 10:07:47 +01:00
parent 284c6d2bb4
commit 9caa7a7124
15 changed files with 15 additions and 18 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,9 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import sys
import kleinanzeigen_bot
kleinanzeigen_bot.main(sys.argv)

View File

@@ -0,0 +1,238 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import json
from decimal import DecimalException
from typing import Any
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webdriver import WebDriver
import selenium.webdriver.support.expected_conditions as EC
from .selenium_mixin import SeleniumMixin
from .utils import parse_decimal, pause
class AdExtractor(SeleniumMixin):
"""
Wrapper class for ad extraction that uses an active bot´s web driver to extract specific elements from an ad page.
"""
def __init__(self, driver:WebDriver):
super().__init__()
self.webdriver = driver
def extract_category_from_ad_page(self) -> str:
"""
Extracts a category of an ad in numerical form.
Assumes that the web driver currently shows an ad page.
:return: a category string of form abc/def, where a-f are digits
"""
category_line = self.webdriver.find_element(By.XPATH, '//*[@id="vap-brdcrmb"]')
category_first_part = category_line.find_element(By.XPATH, './/a[2]')
category_second_part = category_line.find_element(By.XPATH, './/a[3]')
cat_num_first = category_first_part.get_attribute('href').split('/')[-1][1:]
cat_num_second = category_second_part.get_attribute('href').split('/')[-1][1:]
category:str = cat_num_first + '/' + cat_num_second
return category
def extract_special_attributes_from_ad_page(self) -> dict[str, Any]:
"""
Extracts the special attributes from an ad page.
:return: a dictionary (possibly empty) where the keys are the attribute names, mapped to their values
"""
belen_conf = self.webdriver.execute_script("return window.BelenConf")
special_attributes_str = belen_conf["universalAnalyticsOpts"]["dimensions"]["dimension108"]
special_attributes = json.loads(special_attributes_str)
if not isinstance(special_attributes, dict):
raise ValueError(
"Failed to parse special attributes from ad page."
f"Expected a dictionary, but got a {type(special_attributes)}"
)
special_attributes = {k: v for k, v in special_attributes.items() if not k.endswith('.versand_s')}
return special_attributes
def extract_pricing_info_from_ad_page(self) -> tuple[float | None, str]:
"""
Extracts the pricing information (price and pricing type) from an ad page.
:return: the price of the offer (optional); and the pricing type
"""
try:
price_str:str = self.webdriver.find_element(By.CLASS_NAME, 'boxedarticle--price').text
price_type:str
price:float | None = -1
match price_str.split()[-1]:
case '':
price_type = 'FIXED'
price = float(parse_decimal(price_str.split()[0].replace('.', '')))
case 'VB': # can be either 'X € VB', or just 'VB'
price_type = 'NEGOTIABLE'
try:
price = float(parse_decimal(price_str.split()[0].replace('.', '')))
except DecimalException:
price = None
case 'verschenken':
price_type = 'GIVE_AWAY'
price = None
case _:
price_type = 'NOT_APPLICABLE'
return price, price_type
except NoSuchElementException: # no 'commercial' ad, has no pricing box etc.
return None, 'NOT_APPLICABLE'
def extract_shipping_info_from_ad_page(self) -> tuple[str, float | None, list[str] | None]:
"""
Extracts shipping information from an ad page.
:return: the shipping type, and the shipping price (optional)
"""
ship_type, ship_costs, shipping_options = 'NOT_APPLICABLE', None, None
try:
shipping_text = self.webdriver.find_element(By.CSS_SELECTOR, '.boxedarticle--details--shipping') \
.text.strip()
# e.g. '+ Versand ab 5,49 €' OR 'Nur Abholung'
if shipping_text == 'Nur Abholung':
ship_type = 'PICKUP'
elif shipping_text == 'Versand möglich':
ship_type = 'SHIPPING'
elif '' in shipping_text:
shipping_price_parts = shipping_text.split(' ')
ship_type = 'SHIPPING'
ship_costs = float(parse_decimal(shipping_price_parts[-2]))
# extract shipping options
# It is only possible the extract the cheapest shipping option,
# as the other options are not shown
shipping_option_mapping = {
"DHL_2": "5,49",
"Hermes_Päckchen": "4,50",
"Hermes_S": "4,95",
"DHL_5": "6,99",
"Hermes_M": "5,95",
"DHL_10": "9,49",
"DHL_31,5": "16,49",
"Hermes_L": "10,95",
}
for shipping_option, shipping_price in shipping_option_mapping.items():
if shipping_price in shipping_text:
shipping_options = [shipping_option]
break
except NoSuchElementException: # no pricing box -> no shipping given
ship_type = 'NOT_APPLICABLE'
return ship_type, ship_costs, shipping_options
def extract_sell_directly_from_ad_page(self) -> bool | None:
"""
Extracts the sell directly option from an ad page.
:return: a boolean indicating whether the sell directly option is active (optional)
"""
try:
buy_now_is_active = self.webdriver.find_element(By.ID, 'j-buy-now').text == "Direkt kaufen"
return buy_now_is_active
except NoSuchElementException:
return None
def extract_contact_from_ad_page(self) -> dict[str, (str | None)]:
"""
Processes the address part involving street (optional), zip code + city, and phone number (optional).
:return: a dictionary containing the address parts with their corresponding values
"""
contact:dict[str, (str | None)] = {}
address_element = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-locality')
address_text = address_element.text.strip()
# format: e.g. (Beispiel Allee 42,) 12345 Bundesland - Stadt
try:
street_element = self.webdriver.find_element(By.XPATH, '//*[@id="street-address"]')
street = street_element.text[:-2] # trailing comma and whitespace
contact['street'] = street
except NoSuchElementException:
print('No street given in the contact.')
# construct remaining address
address_halves = address_text.split(' - ')
address_left_parts = address_halves[0].split(' ') # zip code and region/city
contact['zipcode'] = address_left_parts[0]
contact_person_element = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-contact')
name_element = contact_person_element.find_element(By.CLASS_NAME, 'iconlist-text')
try:
name = name_element.find_element(By.TAG_NAME, 'a').text
except NoSuchElementException: # edge case: name without link
name = name_element.find_element(By.TAG_NAME, 'span').text
contact['name'] = name
if 'street' not in contact:
contact['street'] = None
try: # phone number is unusual for non-professional sellers today
phone_element = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-contact-phone')
phone_number = phone_element.find_element(By.TAG_NAME, 'a').text
contact['phone'] = ''.join(phone_number.replace('-', ' ').split(' ')).replace('+49(0)', '0')
except NoSuchElementException:
contact['phone'] = None # phone seems to be a deprecated feature (for non-professional users)
# also see 'https://themen.kleinanzeigen.de/hilfe/deine-anzeigen/Telefon/
return contact
def extract_own_ads_references(self) -> list[str]:
"""
Extracts the references to all own ads.
:return: the links to your ad pages
"""
# navigate to your ads page
self.webdriver.get('https://www.kleinanzeigen.de/m-meine-anzeigen.html')
self.web_await(EC.url_contains('meine-anzeigen'), 15)
pause(2000, 3000)
# collect ad references:
pagination_section = self.webdriver.find_element(By.CSS_SELECTOR, '.l-splitpage')\
.find_element(By.XPATH, './/section[4]')
# scroll down to load dynamically
self.web_scroll_page_down()
pause(2000, 3000)
# detect multi-page
try:
pagination = pagination_section.find_element(By.XPATH, './/div/div[2]/div[2]/div') # Pagination
except NoSuchElementException: # 0 ads - no pagination area
print('There currently seem to be no ads on your profile!')
return []
n_buttons = len(pagination.find_element(By.XPATH, './/div[1]').find_elements(By.TAG_NAME, 'button'))
multi_page:bool
if n_buttons > 1:
multi_page = True
print('It seems like you have many ads!')
else:
multi_page = False
print('It seems like all your ads fit on one overview page.')
refs:list[str] = []
while True: # loop reference extraction until no more forward page
# extract references
list_section = self.webdriver.find_element(By.XPATH, '//*[@id="my-manageads-adlist"]')
list_items = list_section.find_elements(By.CLASS_NAME, 'cardbox')
refs += [li.find_element(By.XPATH, 'article/section/section[2]/h2/div/a').get_attribute('href') for li in list_items]
if not multi_page: # only one iteration for single-page overview
break
# check if last page
nav_button = self.webdriver.find_elements(By.CSS_SELECTOR, 'button.jsx-2828608826')[-1]
if nav_button.get_attribute('title') != 'Nächste':
print('Last ad overview page explored.')
break
# navigate to next overview page
nav_button.click()
pause(2000, 3000)
self.web_scroll_page_down()
return refs

View File

@@ -0,0 +1,22 @@
active: # one of: true, false
type: # one of: OFFER, WANTED
title:
description:
category:
special_attributes: {}
price:
price_type: # one of: FIXED, NEGOTIABLE, GIVE_AWAY, NOT_APPLICABLE
shipping_type: # one of: PICKUP, SHIPPING, NOT_APPLICABLE
shipping_costs:
shipping_options: [] # see README.md for more information
sell_directly: # requires shipping_options to take effect
images: []
contact:
name:
street:
zipcode:
phone:
republication_interval:
id:
created_on:
updated_on:

View File

@@ -0,0 +1,198 @@
# Elektronik
Elektronik: 161/168
## Audio & Hifi
Audio_und_Hifi: 161/172/sonstiges
CD_Player: 161/172/cd_player
Kopfhörer: 161/172/lautsprecher_kopfhoerer
Lautsprecher: 161/172/lautsprecher_kopfhoerer
MP3_Player: 161/172/mp3_player
Radio: 161/172/radio_receiver
Reciver: 161/172/radio_receiver
Stereoanlagen: 161/172/stereoanlagen
## Dienstleistungen Elektronik
Dienstleistungen_Elektronik: 161/226
## Foto
Foto: 161/245/other
Kameras: 161/245/camera
Objektive: 161/245/lens
Foto_Zubehör: 161/245/equipment
Kamera_Equipment: 161/245/camera_and_equipment
## Handy & Telefon
Handys: 161/173/sonstige
Handy_Apple: 161/173/apple
Handy_HTC: 161/173/htc_handy
Handy_LG: 161/173/lg_handy
Handy_Motorola: 161/173/motorola_handy
Handy_Nokia: 161/173/nokia_handy
Handy_Samsung: 161/173/samsung_handy
Handy_Siemens: 161/173/siemens_handy
Handy_Sony: 161/173/sony_handy
Faxgeräte: 161/173/faxgeraete
Telefone: 161/173/telefone
## Haushaltsgeräte
Haushaltsgeräte: 161/176/sonstige
Haushaltkleingeräte: 161/176/haushaltskleingeraete
Herde: 161/176/herde_backoefen
Backöfen: 161/176/herde_backoefen
Kaffemaschinen: 161/176/kaffee_espressomaschinen
Espressomaschinen: 161/176/kaffee_espressomaschinen
Kühlschränke: 161/176/kuehlschraenke_gefriergeraete
Gefriergeräte: 161/176/kuehlschraenke_gefriergeraete
Spülmaschinen: 161/176/spuelmaschinen
Staubsauger: 161/176/staubsauger
Waschmaschinen: 161/176/waschmaschinen_trockner
Trockner: 161/176/waschmaschinen_trockner
## Konsolen
Konsolen: 161/279/weitere
Pocket_Konsolen: 161/279/dsi_psp
Playstation: 161/279/playstation
XBox: 161/279/xbox
Wii: 161/279/wii
## Notebooks
Notebooks: 161/278
## PCs
PCs: 161/228
## PC-Zubehör & Software
PC-Zubehör: 161/225/sonstiges
Drucker: 161/225/drucker_scanner
Scanner: 161/225/drucker_scanner
Festplatten: 161/225/festplatten_laufwerke
Laufwerke: 161/225/festplatten_laufwerke
Gehäuse: 161/225/gehaeuse
Grafikkarten: 161/225/grafikkarten
Kabel: 161/225/kabel_adapter
Adapter: 161/225/kabel_adapter
Mainboards: 161/225/mainboards
Monitore: 161/225/monitore
Multimedia: 161/225/multimedia
Netzwerk: 161/225/netzwerk_modem
CPUs: 161/225/prozessor_cpu
Prozessoren: 161/225/prozessor_cpu
Speicher: 161/225/speicher
Software: 161/225/software
Mäuse: 161/225/tastatur_maus
Tastaturen: 161/225/tastatur_maus
## Tablets & Reader
Tablets_Reader: 161/285/weitere
iPad: 161/285/ipad
Kindle: 161/285/kindle
Tablets_Samsung: 161/285/samsung_tablets
## TV & Video
TV_Video: 161/175/weitere
DVD-Player: 161/175/dvdplayer_recorder
Recorder: 161/175/dvdplayer_recorder
Fernseher: 161/175/fernseher
Reciever: 161/175/tv_receiver
## Videospiele
Videospiele: 161/227/sonstige
Videospiele_DS: 161/227/dsi_psp
Videospiele_PSP: 161/227/dsi_psp
Videospiele_Nintendo: 161/227/nintendo
Videospiele_Playstation: 161/227/playstation
Videospiele_XBox: 161/227/xbox
Videospiele_Wii: 161/227/wii
Videospiele_PC: 161/227/pc_spiele
#Auto, Rad & Boot
Autoreifen: 210/223/reifen_felgen
# Freizeit, Hobby & Nachbarschaft
Sammeln: 185/234/sonstige
# Mode & Beauty
Beauty: 153/224/sonstiges
Gesundheit: 153/224/gesundheit
Mode: 153/155
# Mode & Beauty > Damenschuhe
Damenschuhe: 153/159/sonstiges
Damen_Ballerinas: 153/159/ballerinas
Damen_Halbschuhe: 153/159/halb_schnuerschuhe
Damen_Hausschuhe: 153/159/hausschuhe
Damen_High_Heels: 153/159/pumps
Damen_Pumps: 153/159/pumps
Damen_Sandalen: 153/159/sandalen
Damen_Schnürschuhe: 153/159/halb_schnuerschuhe
Damen_Sportschuche: 153/159/sneaker_sportschuhe
Damen_Sneaker: 153/159/sneaker_sportschuhe
Damen_Stiefel: 153/159/stiefel
Damen_Stiefeletten: 153/159/stiefel
Damen_Outdoorschuhe: 153/159/outdoor_wanderschuhe
Damen_Wanderschuhe: 153/159/outdoor_wanderschuhe
# Mode & Beauty > Herrenschuhe
Herrenschuhe: 153/158/sonstiges
Herren_Halbschuhe: 153/158/halb_schnuerschuhe
Herren_Hausschuhe: 153/158/hausschuhe
Herren_Sandalen: 153/158/sandalen
Herren_Schnürschuhe: 153/158/halb_schnuerschuhe
Herren_Sportschuche: 153/158/sneaker_sportschuhe
Herren_Sneaker: 153/158/sneaker_sportschuhe
Herren_Stiefel: 153/158/stiefel
Herren_Stiefeletten: 153/158/stiefel
Herren_Outdoorschuhe: 153/158/outdoor_wanderschuhe
Herren_Wanderschuhe: 153/158/outdoor_wanderschuhe
#Familie, Kind & Baby
Familie_Kind_Baby: 17/18
Altenpflege: 17/236
Babysitter: 17/237
Buggys: 17/25
Babyschalen: 17/21
Baby-Ausstattung: 17/258
Kinderbetreuung: 17/237
Kindersitze: 17/21
Kinderwagen: 17/25
# Familie, Kind & Baby > Spielzeug
Spielzeug: 17/23/sonstiges
Actionfiguren: 17/23/actionfiguren
Babyspielzeug: 17/23/babyspielzeug
Barbie: 17/23/barbie
Dreirad: 17/23/dreirad
Gesellschaftsspiele: 17/23/gesellschaftsspiele
Holzspielzeug: 17/23/holzspielzeug
Duplo: 17/23/lego_duplo
LEGO: 17/23/lego_duplo
Lernspielzeug: 17/23/lernspielzeug
Playmobil: 17/23/playmobil
Puppen: 17/23/puppen
Spielzeugautos: 17/23/spielzeug_autos
Spielzeug_draussen: 17/23/spielzeug_draussen
Stofftiere: 17/23/stofftiere
# Haus & Garten > Wohnzimmer
Wohnzimmer_Regale: 80/88/regale
Wohnzimmer_Schraenke: 80/88/schraenke
Wohnzimmer_Sitzmoebel: 80/88/sitzmoebel
Wohnzimmer_Sofas_Sitzgarnituren: 80/88/sofas_sitzgarnituren
Wohnzimmer_Tische: 80/88/tische
Wohnzimmer_TV_Moebel: 80/88/tv_moebel
Wohnzimmer_Sonstiges: 80/88/sonstiges
# Verschenken & Tauschen
Tauschen: 272/273
Verleihen: 272/274
Verschenken: 272/192

View File

@@ -0,0 +1,45 @@
ad_files:
- "./**/ad_*.{json,yml,yaml}"
# default values for ads, can be overwritten in each ad configuration file
ad_defaults:
active: true
type: OFFER # one of: OFFER, WANTED
description:
prefix: ""
suffix: ""
price_type: NEGOTIABLE # one of: FIXED, NEGOTIABLE, GIVE_AWAY, NOT_APPLICABLE
shipping_type: SHIPPING # one of: PICKUP, SHIPPING, NOT_APPLICABLE
sell_directly: false # requires shipping_options to take effect
contact:
name: ""
street: ""
zipcode:
phone: "" # IMPORTANT: surround phone number with quotes to prevent removal of leading zeros
republication_interval: 7 # every X days ads should be re-published
# additional name to category ID mappings, see default list at
# https://github.com/Second-Hand-Friends/kleinanzeigen-bot/blob/main/kleinanzeigen_bot/resources/categories.yaml
# Notebooks: 161/278 # Elektronik > Notebooks
# Autoteile: 210/223/sonstige_autoteile # Auto, Rad & Boot > Autoteile & Reifen > Weitere Autoteile
categories: []
# browser configuration
browser:
# https://peter.sh/experiments/chromium-command-line-switches/
arguments:
# https://stackoverflow.com/a/50725918/5116073
- --disable-dev-shm-usage
- --no-sandbox
# --headless
# --start-maximized
binary_location: # path to custom browser executable, if not specified will be looked up on PATH
extensions: [] # a list of .crx extension files to be loaded
use_private_window: true
user_data_dir: "" # see https://github.com/chromium/chromium/blob/main/docs/user_data_dir.md
profile_name: ""
# login credentials
login:
username: ""
password: ""

View File

@@ -0,0 +1,399 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import logging, os, shutil, time
from collections.abc import Callable, Iterable
from typing import Any, Final, TypeVar
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chromium.options import ChromiumOptions
from selenium.webdriver.chromium.webdriver import ChromiumDriver
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import Select, WebDriverWait
import selenium_stealth
import webdriver_manager.core
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.core.driver_cache import DriverCacheManager
from webdriver_manager.core.manager import DriverManager
from webdriver_manager.core.os_manager import ChromeType, OSType, OperationSystemManager
from webdriver_manager.microsoft import EdgeChromiumDriverManager
from .utils import ensure, pause, T
LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.selenium_mixin")
DEFAULT_CHROMEDRIVER_PATH = "chromedriver"
DEFAULT_EDGEDRIVER_PATH = "msedgedriver"
class BrowserConfig:
def __init__(self) -> None:
self.arguments:Iterable[str] = []
self.binary_location:str | None = None
self.extensions:Iterable[str] = []
self.use_private_window:bool = True
self.user_data_dir:str = ""
self.profile_name:str = ""
CHROMIUM_OPTIONS = TypeVar('CHROMIUM_OPTIONS', bound = ChromiumOptions) # pylint: disable=invalid-name
class SeleniumMixin:
def __init__(self) -> None:
self.browser_config:Final[BrowserConfig] = BrowserConfig()
self.webdriver:WebDriver = None
def _init_browser_options(self, browser_options:CHROMIUM_OPTIONS) -> CHROMIUM_OPTIONS:
if self.browser_config.use_private_window:
if isinstance(browser_options, webdriver.EdgeOptions):
browser_options.add_argument("-inprivate")
else:
browser_options.add_argument("--incognito")
if self.browser_config.user_data_dir:
LOG.info(" -> Browser User Data Dir: %s", self.browser_config.user_data_dir)
browser_options.add_argument(f"--user-data-dir={self.browser_config.user_data_dir}")
if self.browser_config.profile_name:
LOG.info(" -> Browser Profile Name: %s", self.browser_config.profile_name)
browser_options.add_argument(f"--profile-directory={self.browser_config.profile_name}")
browser_options.add_argument("--disable-crash-reporter")
browser_options.add_argument("--no-first-run")
browser_options.add_argument("--no-service-autorun")
for chrome_option in self.browser_config.arguments:
LOG.info(" -> Custom chrome argument: %s", chrome_option)
browser_options.add_argument(chrome_option)
LOG.debug("Effective browser arguments: %s", browser_options.arguments)
for crx_extension in self.browser_config.extensions:
ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
browser_options.add_extension(crx_extension)
LOG.debug("Effective browser extensions: %s", browser_options.extensions)
browser_options.add_experimental_option("excludeSwitches", ["enable-automation"])
browser_options.add_experimental_option("useAutomationExtension", False)
browser_options.add_experimental_option("prefs", {
"credentials_enable_service": False,
"profile.password_manager_enabled": False,
"profile.default_content_setting_values.notifications": 2, # 1 = allow, 2 = block browser notifications
"devtools.preferences.currentDockState": "\"bottom\""
})
if not LOG.isEnabledFor(logging.DEBUG):
browser_options.add_argument("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3
LOG.debug("Effective experimental options: %s", browser_options.experimental_options)
if self.browser_config.binary_location:
browser_options.binary_location = self.browser_config.binary_location
LOG.info(" -> Chrome binary location: %s", self.browser_config.binary_location)
return browser_options
def create_webdriver_session(self, *, use_preinstalled_webdriver:bool = True) -> None:
LOG.info("Creating WebDriver session...")
if not LOG.isEnabledFor(logging.DEBUG):
os.environ['WDM_LOG_LEVEL'] = '0' # silence the web driver manager
# check if a chrome driver is present already
if use_preinstalled_webdriver and shutil.which(DEFAULT_CHROMEDRIVER_PATH):
LOG.info("Using pre-installed Chrome Driver [%s]", shutil.which(DEFAULT_CHROMEDRIVER_PATH))
self.webdriver = webdriver.Chrome(options = self._init_browser_options(webdriver.ChromeOptions()))
elif use_preinstalled_webdriver and shutil.which(DEFAULT_EDGEDRIVER_PATH):
LOG.info("Using pre-installed Edge Driver [%s]", shutil.which(DEFAULT_EDGEDRIVER_PATH))
self.webdriver = webdriver.ChromiumEdge(options = self._init_browser_options(webdriver.EdgeOptions()))
else:
# determine browser major version
if self.browser_config.binary_location:
ensure(os.path.exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.")
chrome_type, chrome_version = self.get_browser_version(self.browser_config.binary_location)
else:
browser_info = self.find_compatible_browser()
if browser_info is None:
raise AssertionError("No supported browser found!")
chrome_path, chrome_type, chrome_version = browser_info
self.browser_config.binary_location = chrome_path
LOG.info("Using Browser: %s %s [%s]", chrome_type.upper(), chrome_version, self.browser_config.binary_location)
chrome_major_version = chrome_version.split(".", 1)[0]
# hack to specify the concrete browser version for which the driver shall be downloaded
webdriver_manager.core.driver.get_browser_version_from_os = lambda _: chrome_major_version
# download and install matching chrome driver
webdriver_mgr: DriverManager
if chrome_type == ChromeType.MSEDGE:
webdriver_mgr = EdgeChromiumDriverManager(cache_manager = DriverCacheManager(valid_range = 14))
webdriver_path = webdriver_mgr.install()
env = os.environ.copy()
env["MSEDGEDRIVER_TELEMETRY_OPTOUT"] = "1" # https://docs.microsoft.com/en-us/microsoft-edge/privacy-whitepaper/#microsoft-edge-driver
self.webdriver = webdriver.ChromiumEdge(
service = EdgeService(webdriver_path, env = env),
options = self._init_browser_options(webdriver.EdgeOptions())
)
else:
webdriver_mgr = ChromeDriverManager(chrome_type = chrome_type, cache_manager = DriverCacheManager(valid_range = 14))
webdriver_path = webdriver_mgr.install()
self.webdriver = webdriver.Chrome(service = ChromeService(webdriver_path), options = self._init_browser_options(webdriver.ChromeOptions()))
# workaround to support Edge, see https://github.com/diprajpatra/selenium-stealth/pull/25
selenium_stealth.Driver = ChromiumDriver
selenium_stealth.stealth(self.webdriver, # https://github.com/diprajpatra/selenium-stealth#args
languages = ("de-DE", "de", "en-US", "en"),
platform = "Win32",
fix_hairline = True,
)
LOG.info("New WebDriver session is: %s %s", self.webdriver.session_id, self.webdriver.command_executor._url) # pylint: disable=protected-access
def get_browser_version(self, executable_path: str) -> tuple[ChromeType, str]: # -> [ chrome_type, chrome_version ]
match OperationSystemManager.get_os_name():
case OSType.WIN:
import win32api # pylint: disable=import-outside-toplevel,import-error
# pylint: disable=no-member
lang, codepage = win32api.GetFileVersionInfo(executable_path, "\\VarFileInfo\\Translation")[0]
product_name = win32api.GetFileVersionInfo(executable_path, f"\\StringFileInfo\\{lang:04X}{codepage:04X}\\ProductName")
product_version = win32api.GetFileVersionInfo(executable_path, f"\\StringFileInfo\\{lang:04X}{codepage:04X}\\ProductVersion")
# pylint: enable=no-member
match product_name:
case "Chromium":
return (ChromeType.CHROMIUM, product_version)
case "Microsoft Edge":
return (ChromeType.MSEDGE, product_version)
case _: # "Google Chrome"
return (ChromeType.GOOGLE, product_version)
case OSType.LINUX:
version_cmd = webdriver_manager.core.utils.linux_browser_apps_to_cmd(f'"{executable_path}"')
case _:
version_cmd = f'"{executable_path}" --version'
filename = os.path.basename(executable_path).lower()
if "chromium" in filename:
return (
ChromeType.CHROMIUM,
webdriver_manager.core.utils.read_version_from_cmd(version_cmd, webdriver_manager.core.os_manager.PATTERN[ChromeType.CHROMIUM])
)
if "edge" in filename:
return (
ChromeType.MSEDGE,
webdriver_manager.core.utils.read_version_from_cmd(version_cmd, webdriver_manager.core.os_manager.PATTERN[ChromeType.MSEDGE])
)
return (
ChromeType.GOOGLE,
webdriver_manager.core.utils.read_version_from_cmd(version_cmd, webdriver_manager.core.os_manager.PATTERN[ChromeType.GOOGLE])
)
def find_compatible_browser(self) -> tuple[str, ChromeType, str] | None: # -> [ browser_path, chrome_type, chrome_version ]
match OperationSystemManager.get_os_name():
case OSType.LINUX:
browser_paths = [
shutil.which("chromium"),
shutil.which("chromium-browser"),
shutil.which("google-chrome"),
shutil.which("microsoft-edge")
]
case OSType.MAC:
browser_paths = [
"/Applications/Chromium.app/Contents/MacOS/Chromium",
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
]
case OSType.WIN:
browser_paths = [
os.environ.get("ProgramFiles", "C:\\Program Files") + r'\Microsoft\Edge\Application\msedge.exe',
os.environ.get("ProgramFiles(x86)", "C:\\Program Files (x86)") + r'\Microsoft\Edge\Application\msedge.exe',
os.environ["ProgramFiles"] + r'\Chromium\Application\chrome.exe',
os.environ["ProgramFiles(x86)"] + r'\Chromium\Application\chrome.exe',
os.environ["LOCALAPPDATA"] + r'\Chromium\Application\chrome.exe',
os.environ["ProgramFiles"] + r'\Chrome\Application\chrome.exe',
os.environ["ProgramFiles(x86)"] + r'\Chrome\Application\chrome.exe',
os.environ["LOCALAPPDATA"] + r'\Chrome\Application\chrome.exe',
shutil.which("msedge.exe"),
shutil.which("chromium.exe"),
shutil.which("chrome.exe")
]
case _ as os_name:
LOG.warning("Installed browser for OS [%s] could not be detected", os_name)
return None
for browser_path in browser_paths:
if browser_path and os.path.isfile(browser_path):
return (browser_path, *self.get_browser_version(browser_path))
LOG.warning("Installed browser could not be detected")
return None
def web_await(self, condition: Callable[[WebDriver], T], timeout:float = 5, exception_on_timeout: Callable[[], Exception] | None = None) -> T:
"""
Blocks/waits until the given condition is met.
:param timeout: timeout in seconds
:raises TimeoutException: if element could not be found within time
"""
max_attempts = 2
for attempt in range(max_attempts + 1)[1:]:
try:
return WebDriverWait(self.webdriver, timeout).until(condition) # type: ignore[no-any-return]
except TimeoutException as ex:
if exception_on_timeout:
raise exception_on_timeout() from ex
raise ex
except WebDriverException as ex:
# temporary workaround for:
# - https://groups.google.com/g/chromedriver-users/c/Z_CaHJTJnLw
# - https://bugs.chromium.org/p/chromedriver/issues/detail?id=4048
if ex.msg == "target frame detached" and attempt < max_attempts:
LOG.warning(ex)
else:
raise ex
raise AssertionError("Should never be reached.")
def web_click(self, selector_type:By, selector_value:str, timeout:float = 5) -> WebElement:
"""
:param timeout: timeout in seconds
:raises NoSuchElementException: if element could not be found within time
"""
elem = self.web_await(
EC.element_to_be_clickable((selector_type, selector_value)),
timeout,
lambda: NoSuchElementException(f"Element {selector_type}:{selector_value} not found or not clickable")
)
elem.click()
pause()
return elem
def web_execute(self, javascript:str) -> Any:
"""
Executes the given JavaScript code in the context of the current page.
:return: The command's JSON response
"""
return self.webdriver.execute_script(javascript)
def web_find(self, selector_type:By, selector_value:str, timeout:float = 5) -> WebElement:
"""
Locates an HTML element.
:param timeout: timeout in seconds
:raises NoSuchElementException: if element could not be found within time
"""
return self.web_await(
EC.presence_of_element_located((selector_type, selector_value)),
timeout,
lambda: NoSuchElementException(f"Element {selector_type}='{selector_value}' not found")
)
def web_input(self, selector_type:By, selector_value:str, text:str, timeout:float = 5) -> WebElement:
"""
Enters text into an HTML input field.
:param timeout: timeout in seconds
:raises NoSuchElementException: if element could not be found within time
"""
input_field = self.web_find(selector_type, selector_value, timeout)
input_field.clear()
input_field.send_keys(text)
pause()
return input_field
def web_open(self, url:str, timeout:float = 15, reload_if_already_open:bool = False) -> None:
"""
:param url: url to open in browser
:param timeout: timespan in seconds within the page needs to be loaded
:param reload_if_already_open: if False does nothing if the URL is already open in the browser
:raises TimeoutException: if page did not open within given timespan
"""
LOG.debug(" -> Opening [%s]...", url)
if not reload_if_already_open and url == self.webdriver.current_url:
LOG.debug(" => skipping, [%s] is already open", url)
return
self.webdriver.get(url)
WebDriverWait(self.webdriver, timeout).until(lambda _: self.web_execute("return document.readyState") == "complete")
# pylint: disable=dangerous-default-value
def web_request(self, url:str, method:str = "GET", valid_response_codes:Iterable[int] = [200], headers:dict[str, str] | None = None) -> dict[str, Any]:
method = method.upper()
LOG.debug(" -> HTTP %s [%s]...", method, url)
response:dict[str, Any] = self.webdriver.execute_async_script(f"""
var callback = arguments[arguments.length - 1];
fetch("{url}", {{
method: "{method}",
redirect: "follow",
headers: {headers or {}}
}})
.then(response => response.text().then(responseText => {{
headers = {{}};
response.headers.forEach((v, k) => headers[k] = v);
callback({{
"statusCode": response.status,
"statusMessage": response.statusText,
"headers": headers,
"content": responseText
}})
}}))
""")
ensure(
response["statusCode"] in valid_response_codes,
f'Invalid response "{response["statusCode"]} response["statusMessage"]" received for HTTP {method} to {url}'
)
return response
# pylint: enable=dangerous-default-value
def web_scroll_page_down(self, scroll_length: int = 10, scroll_speed: int = 10000, scroll_back_top: bool = False) -> None:
"""
Smoothly scrolls the current web page down.
:param scroll_length: the length of a single scroll iteration, determines smoothness of scrolling, lower is smoother
:param scroll_speed: the speed of scrolling, higher is faster
:param scroll_back_top: whether to scroll the page back to the top after scrolling to the bottom
"""
current_y_pos = 0
bottom_y_pos: int = self.webdriver.execute_script('return document.body.scrollHeight;') # get bottom position by JS
while current_y_pos < bottom_y_pos: # scroll in steps until bottom reached
current_y_pos += scroll_length
self.webdriver.execute_script(f'window.scrollTo(0, {current_y_pos});') # scroll one step
time.sleep(scroll_length / scroll_speed)
if scroll_back_top: # scroll back to top in same style
while current_y_pos > 0:
current_y_pos -= scroll_length
self.webdriver.execute_script(f'window.scrollTo(0, {current_y_pos});')
time.sleep(scroll_length / scroll_speed / 2) # double speed
def web_select(self, selector_type:By, selector_value:str, selected_value:Any, timeout:float = 5) -> WebElement:
"""
Selects an <option/> of a <select/> HTML element.
:param timeout: timeout in seconds
:raises NoSuchElementException: if element could not be found within time
:raises UnexpectedTagNameException: if element is not a <select> element
"""
elem = self.web_await(
EC.element_to_be_clickable((selector_type, selector_value)),
timeout,
lambda: NoSuchElementException(f"Element {selector_type}='{selector_value}' not found or not clickable")
)
Select(elem).select_by_value(selected_value)
pause()
return elem

View File

@@ -0,0 +1,291 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import copy, decimal, json, logging, os, re, secrets, sys, traceback, time
from importlib.resources import read_text as get_resource_as_string
from collections.abc import Callable, Sized
from datetime import datetime
from types import FrameType, ModuleType, TracebackType
from typing import Any, Final, TypeVar
import coloredlogs
from ruamel.yaml import YAML
LOG_ROOT:Final[logging.Logger] = logging.getLogger()
LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.utils")
# https://mypy.readthedocs.io/en/stable/generics.html#generic-functions
T = TypeVar('T')
def abspath(relative_path:str, relative_to:str | None = None) -> str:
"""
Makes a given relative path absolute based on another file/folder
"""
if os.path.isabs(relative_path):
return relative_path
if not relative_to:
return os.path.abspath(relative_path)
if os.path.isfile(relative_to):
relative_to = os.path.dirname(relative_to)
return os.path.normpath(os.path.join(relative_to, relative_path))
def ensure(condition:Any | bool | Callable[[], bool], error_message:str, timeout:float = 5, poll_requency:float = 0.5) -> None:
"""
:param timeout: timespan in seconds until when the condition must become `True`, default is 5 seconds
:param poll_requency: sleep interval between calls in seconds, default is 0.5 seconds
:raises AssertionError: if condition did not come `True` within given timespan
"""
if not isinstance(condition, Callable): # type: ignore[arg-type] # https://github.com/python/mypy/issues/6864
if condition:
return
raise AssertionError(error_message)
if timeout < 0:
raise AssertionError("[timeout] must be >= 0")
if poll_requency < 0:
raise AssertionError("[poll_requency] must be >= 0")
start_at = time.time()
while not condition(): # type: ignore[operator]
elapsed = time.time() - start_at
if elapsed >= timeout:
raise AssertionError(error_message)
time.sleep(poll_requency)
def is_frozen() -> bool:
"""
>>> is_frozen()
False
"""
return getattr(sys, "frozen", False)
def apply_defaults(
target:dict[Any, Any],
defaults:dict[Any, Any],
ignore:Callable[[Any, Any], bool] = lambda _k, _v: False,
override:Callable[[Any, Any], bool] = lambda _k, _v: False
) -> dict[Any, Any]:
"""
>>> apply_defaults({}, {"foo": "bar"})
{'foo': 'bar'}
>>> apply_defaults({"foo": "foo"}, {"foo": "bar"})
{'foo': 'foo'}
>>> apply_defaults({"foo": ""}, {"foo": "bar"})
{'foo': ''}
>>> apply_defaults({}, {"foo": "bar"}, ignore = lambda k, _: k == "foo")
{}
>>> apply_defaults({"foo": ""}, {"foo": "bar"}, override = lambda _, v: v == "")
{'foo': 'bar'}
>>> apply_defaults({"foo": None}, {"foo": "bar"}, override = lambda _, v: v == "")
{'foo': None}
"""
for key, default_value in defaults.items():
if key in target:
if isinstance(target[key], dict) and isinstance(default_value, dict):
apply_defaults(target[key], default_value, ignore = ignore)
elif override(key, target[key]):
target[key] = copy.deepcopy(default_value)
elif not ignore(key, default_value):
target[key] = copy.deepcopy(default_value)
return target
def safe_get(a_map:dict[Any, Any], *keys:str) -> Any:
"""
>>> safe_get({"foo": {}}, "foo", "bar") is None
True
>>> safe_get({"foo": {"bar": "some_value"}}, "foo", "bar")
'some_value'
"""
if a_map:
for key in keys:
try:
a_map = a_map[key]
except (KeyError, TypeError):
return None
return a_map
def configure_console_logging() -> None:
stdout_log = logging.StreamHandler(sys.stderr)
stdout_log.setLevel(logging.DEBUG)
stdout_log.setFormatter(coloredlogs.ColoredFormatter("[%(levelname)s] %(message)s"))
stdout_log.addFilter(type("", (logging.Filter,), {
"filter": lambda rec: rec.levelno <= logging.INFO
}))
LOG_ROOT.addHandler(stdout_log)
stderr_log = logging.StreamHandler(sys.stderr)
stderr_log.setLevel(logging.WARNING)
stderr_log.setFormatter(coloredlogs.ColoredFormatter("[%(levelname)s] %(message)s"))
LOG_ROOT.addHandler(stderr_log)
def on_exception(ex_type:type[BaseException], ex_value:Any, ex_traceback:TracebackType | None) -> None:
if issubclass(ex_type, KeyboardInterrupt):
sys.__excepthook__(ex_type, ex_value, ex_traceback)
elif LOG.isEnabledFor(logging.DEBUG) or isinstance(ex_value, (AttributeError, ImportError, NameError, TypeError)):
LOG.error("".join(traceback.format_exception(ex_type, ex_value, ex_traceback)))
elif isinstance(ex_value, AssertionError):
LOG.error(ex_value)
else:
LOG.error("%s: %s", ex_type.__name__, ex_value)
def on_exit() -> None:
for handler in LOG_ROOT.handlers:
handler.flush()
def on_sigint(_sig:int, _frame:FrameType | None) -> None:
LOG.warning("Aborted on user request.")
sys.exit(0)
def pause(min_ms:int = 200, max_ms:int = 2000) -> None:
duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms
LOG.log(logging.INFO if duration > 1500 else logging.DEBUG, " ... pausing for %d ms ...", duration)
time.sleep(duration / 1000)
def pluralize(noun:str, count:int | Sized, prefix_with_count:bool = True) -> str:
"""
>>> pluralize("field", 1)
'1 field'
>>> pluralize("field", 2)
'2 fields'
>>> pluralize("field", 2, prefix_with_count = False)
'fields'
"""
if isinstance(count, Sized):
count = len(count)
prefix = f"{count} " if prefix_with_count else ""
if count == 1:
return f"{prefix}{noun}"
if noun.endswith('s') or noun.endswith('sh') or noun.endswith('ch') or noun.endswith('x') or noun.endswith('z'):
return f"{prefix}{noun}es"
if noun.endswith('y'):
return f"{prefix}{noun[:-1]}ies"
return f"{prefix}{noun}s"
def load_dict(filepath:str, content_label:str = "") -> dict[str, Any]:
"""
:raises FileNotFoundError
"""
data = load_dict_if_exists(filepath, content_label)
if data is None:
raise FileNotFoundError(filepath)
return data
def load_dict_if_exists(filepath:str, content_label:str = "") -> dict[str, Any] | None:
filepath = os.path.abspath(filepath)
LOG.info("Loading %s[%s]...", content_label and content_label + " from " or "", filepath)
_, file_ext = os.path.splitext(filepath)
if file_ext not in [".json", ".yaml", ".yml"]:
raise ValueError(f'Unsupported file type. The file name "{filepath}" must end with *.json, *.yaml, or *.yml')
if not os.path.exists(filepath):
return None
with open(filepath, encoding = "utf-8") as file:
return json.load(file) if filepath.endswith(".json") else YAML().load(file) # type: ignore[no-any-return] # mypy
def load_dict_from_module(module:ModuleType, filename:str, content_label:str = "") -> dict[str, Any]:
"""
:raises FileNotFoundError
"""
LOG.debug("Loading %s[%s.%s]...", content_label and content_label + " from " or "", module.__name__, filename)
_, file_ext = os.path.splitext(filename)
if file_ext not in (".json", ".yaml", ".yml"):
raise ValueError(f'Unsupported file type. The file name "{filename}" must end with *.json, *.yaml, or *.yml')
content = get_resource_as_string(module, filename) # pylint: disable=deprecated-method
return json.loads(content) if filename.endswith(".json") else YAML().load(content) # type: ignore[no-any-return] # mypy
def save_dict(filepath:str, content:dict[str, Any]) -> None:
filepath = os.path.abspath(filepath)
LOG.info("Saving [%s]...", filepath)
with open(filepath, "w", encoding = "utf-8") as file:
if filepath.endswith(".json"):
file.write(json.dumps(content, indent = 2, ensure_ascii = False))
else:
yaml = YAML()
yaml.indent(mapping = 2, sequence = 4, offset = 2)
yaml.allow_duplicate_keys = False
yaml.explicit_start = False
yaml.dump(content, file)
def parse_decimal(number:float | int | str) -> decimal.Decimal:
"""
>>> parse_decimal(5)
Decimal('5')
>>> parse_decimal(5.5)
Decimal('5.5')
>>> parse_decimal("5.5")
Decimal('5.5')
>>> parse_decimal("5,5")
Decimal('5.5')
>>> parse_decimal("1.005,5")
Decimal('1005.5')
>>> parse_decimal("1,005.5")
Decimal('1005.5')
"""
try:
return decimal.Decimal(number)
except decimal.InvalidOperation as ex:
parts = re.split("[.,]", str(number))
try:
return decimal.Decimal("".join(parts[:-1]) + "." + parts[-1])
except decimal.InvalidOperation:
raise decimal.DecimalException(f"Invalid number format: {number}") from ex
def parse_datetime(date:datetime | str | None) -> datetime | None:
"""
>>> parse_datetime(datetime(2020, 1, 1, 0, 0))
datetime.datetime(2020, 1, 1, 0, 0)
>>> parse_datetime("2020-01-01T00:00:00")
datetime.datetime(2020, 1, 1, 0, 0)
>>> parse_datetime(None)
"""
if date is None:
return None
if isinstance(date, datetime):
return date
return datetime.fromisoformat(date)
def extract_ad_id_from_ad_link(url: str) -> int:
"""
Extracts the ID of an ad, given by its reference link.
:param url: the URL to the ad page
:return: the ad ID, a (ten-digit) integer number
"""
num_part = url.split('/')[-1] # suffix
id_part = num_part.split('-')[0]
try:
return int(id_part)
except ValueError:
print('The ad ID could not be extracted from the given ad reference!')
return -1