mirror of
https://github.com/Second-Hand-Friends/kleinanzeigen-bot.git
synced 2026-03-12 10:31:50 +01:00
replace selenium with nodriver
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -3,51 +3,291 @@ SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import json
|
||||
from decimal import DecimalException
|
||||
from typing import Any
|
||||
import json, logging, os, shutil
|
||||
import urllib.request as urllib_request
|
||||
from datetime import datetime
|
||||
from typing import Any, Final
|
||||
|
||||
from selenium.common.exceptions import NoSuchElementException
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
import selenium.webdriver.support.expected_conditions as EC
|
||||
from .utils import is_integer, parse_decimal, save_dict
|
||||
from .web_scraping_mixin import Browser, By, Element, Is, WebScrapingMixin
|
||||
|
||||
from .selenium_mixin import SeleniumMixin
|
||||
from .utils import parse_decimal, pause
|
||||
LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.AdExtractor")
|
||||
|
||||
|
||||
class AdExtractor(SeleniumMixin):
|
||||
class AdExtractor(WebScrapingMixin):
|
||||
"""
|
||||
Wrapper class for ad extraction that uses an active bot´s web driver to extract specific elements from an ad page.
|
||||
Wrapper class for ad extraction that uses an active bot´s browser session to extract specific elements from an ad page.
|
||||
"""
|
||||
|
||||
def __init__(self, driver:WebDriver):
|
||||
def __init__(self, browser:Browser):
|
||||
super().__init__()
|
||||
self.webdriver = driver
|
||||
self.browser = browser
|
||||
|
||||
def extract_category_from_ad_page(self) -> str:
|
||||
async def download_ad(self, ad_id:int) -> None:
|
||||
"""
|
||||
Downloads an ad to a specific location, specified by config and ad ID.
|
||||
NOTE: Requires that the driver session currently is on the ad page.
|
||||
|
||||
:param ad_id: the ad ID
|
||||
"""
|
||||
|
||||
# create sub-directory for ad(s) to download (if necessary):
|
||||
relative_directory = 'downloaded-ads'
|
||||
# make sure configured base directory exists
|
||||
if not os.path.exists(relative_directory) or not os.path.isdir(relative_directory):
|
||||
os.mkdir(relative_directory)
|
||||
LOG.info('Created ads directory at ./%s.', relative_directory)
|
||||
|
||||
new_base_dir = os.path.join(relative_directory, f'ad_{ad_id}')
|
||||
if os.path.exists(new_base_dir):
|
||||
LOG.info('Deleting current folder of ad...')
|
||||
shutil.rmtree(new_base_dir)
|
||||
os.mkdir(new_base_dir)
|
||||
LOG.info('New directory for ad created at %s.', new_base_dir)
|
||||
|
||||
# call extraction function
|
||||
info = await self._extract_ad_page_info(new_base_dir, ad_id)
|
||||
ad_file_path = new_base_dir + '/' + f'ad_{ad_id}.yaml'
|
||||
save_dict(ad_file_path, info)
|
||||
|
||||
async def _download_images_from_ad_page(self, directory:str, ad_id:int) -> list[str]:
|
||||
"""
|
||||
Downloads all images of an ad.
|
||||
|
||||
:param directory: the path of the directory created for this ad
|
||||
:param ad_id: the ID of the ad to download the images from
|
||||
:return: the relative paths for all downloaded images
|
||||
"""
|
||||
|
||||
n_images:int
|
||||
img_paths = []
|
||||
try:
|
||||
# download all images from box
|
||||
image_box = await self.web_find(By.CLASS_NAME, 'galleryimage-large')
|
||||
|
||||
n_images = len(await self.web_find_all(By.CSS_SELECTOR, '.galleryimage-element[data-ix]', parent = image_box))
|
||||
LOG.info('Found %d images.', n_images)
|
||||
|
||||
img_element:Element = await self.web_find(By.CSS_SELECTOR, 'div:nth-child(1) > img', parent = image_box)
|
||||
img_fn_prefix = 'ad_' + str(ad_id) + '__img'
|
||||
|
||||
img_nr = 1
|
||||
dl_counter = 0
|
||||
while img_nr <= n_images: # scrolling + downloading
|
||||
current_img_url = img_element.attrs['src'] # URL of the image
|
||||
if current_img_url is None:
|
||||
continue
|
||||
file_ending = current_img_url.split('.')[-1].lower()
|
||||
img_path = directory + '/' + img_fn_prefix + str(img_nr) + '.' + file_ending
|
||||
if current_img_url.startswith('https'): # verify https (for Bandit linter)
|
||||
urllib_request.urlretrieve(current_img_url, img_path) # nosec B310
|
||||
dl_counter += 1
|
||||
img_paths.append(img_path.split('/')[-1])
|
||||
|
||||
# navigate to next image (if exists)
|
||||
if img_nr < n_images:
|
||||
try:
|
||||
# click next button, wait, and re-establish reference
|
||||
await (await self.web_find(By.CLASS_NAME, 'galleryimage--navigation--next')).click()
|
||||
new_div = await self.web_find(By.CSS_SELECTOR, f'div.galleryimage-element:nth-child({img_nr + 1})')
|
||||
img_element = await self.web_find(By.TAG_NAME, 'img', parent = new_div)
|
||||
except TimeoutError:
|
||||
LOG.error('NEXT button in image gallery somehow missing, abort image fetching.')
|
||||
break
|
||||
img_nr += 1
|
||||
LOG.info('Downloaded %d image(s).', dl_counter)
|
||||
|
||||
except TimeoutError: # some ads do not require images
|
||||
LOG.warning('No image area found. Continue without downloading images.')
|
||||
|
||||
return img_paths
|
||||
|
||||
def extract_ad_id_from_ad_url(self, url: str) -> int:
|
||||
"""
|
||||
Extracts the ID of an ad, given by its reference link.
|
||||
|
||||
:param url: the URL to the ad page
|
||||
:return: the ad ID, a (ten-digit) integer number
|
||||
"""
|
||||
num_part = url.split('/')[-1] # suffix
|
||||
id_part = num_part.split('-')[0]
|
||||
|
||||
try:
|
||||
return int(id_part)
|
||||
except ValueError:
|
||||
LOG.warning('The ad ID could not be extracted from the given URL %s', url)
|
||||
return -1
|
||||
|
||||
async def extract_own_ads_urls(self) -> list[str]:
|
||||
"""
|
||||
Extracts the references to all own ads.
|
||||
|
||||
:return: the links to your ad pages
|
||||
"""
|
||||
# navigate to "your ads" page
|
||||
await self.web_open('https://www.kleinanzeigen.de/m-meine-anzeigen.html')
|
||||
await self.web_sleep(2000, 3000)
|
||||
|
||||
# collect ad references:
|
||||
pagination_section = await self.web_find(By.CSS_SELECTOR, 'section:nth-of-type(4)',
|
||||
parent = await self.web_find(By.CSS_SELECTOR, '.l-splitpage'))
|
||||
|
||||
# scroll down to load dynamically
|
||||
await self.web_scroll_page_down()
|
||||
await self.web_sleep(2000, 3000)
|
||||
|
||||
# detect multi-page
|
||||
try:
|
||||
pagination = await self.web_find(By.CSS_SELECTOR, 'div > div:nth-of-type(2) > div:nth-of-type(2) > div',
|
||||
parent = pagination_section)
|
||||
except TimeoutError: # 0 ads - no pagination area
|
||||
LOG.warning('There are currently no ads on your profile!')
|
||||
return []
|
||||
|
||||
n_buttons = len(await self.web_find_all(By.CSS_SELECTOR, 'button',
|
||||
parent = await self.web_find(By.CSS_SELECTOR, 'div:nth-of-type(1)', parent = pagination)))
|
||||
if n_buttons > 1:
|
||||
multi_page = True
|
||||
LOG.info('It seems like you have many ads!')
|
||||
else:
|
||||
multi_page = False
|
||||
LOG.info('It seems like all your ads fit on one overview page.')
|
||||
|
||||
refs:list[str] = []
|
||||
while True: # loop reference extraction until no more forward page
|
||||
# extract references
|
||||
list_items = await self.web_find_all(By.CLASS_NAME, 'cardbox',
|
||||
parent = await self.web_find(By.ID, 'my-manageads-adlist'))
|
||||
refs += [
|
||||
(await self.web_find(By.CSS_SELECTOR, 'article > section > section:nth-of-type(2) > h2 > div > a', parent = li)).attrs['href']
|
||||
for li in list_items
|
||||
]
|
||||
|
||||
if not multi_page: # only one iteration for single-page overview
|
||||
break
|
||||
# check if last page
|
||||
nav_button:Element = (await self.web_find_all(By.CSS_SELECTOR, 'button.jsx-2828608826'))[-1]
|
||||
if nav_button.attrs['title'] != 'Nächste':
|
||||
LOG.info('Last ad overview page explored.')
|
||||
break
|
||||
# navigate to next overview page
|
||||
await nav_button.click()
|
||||
await self.web_sleep(2000, 3000)
|
||||
await self.web_scroll_page_down()
|
||||
|
||||
return refs
|
||||
|
||||
async def naviagte_to_ad_page(self, id_or_url:int | str) -> bool:
|
||||
"""
|
||||
Navigates to an ad page specified with an ad ID; or alternatively by a given URL.
|
||||
:return: whether the navigation to the ad page was successful
|
||||
"""
|
||||
if is_integer(id_or_url):
|
||||
# enter the ad ID into the search bar
|
||||
await self.web_input(By.ID, "site-search-query", id_or_url)
|
||||
# navigate to ad page and wait
|
||||
await self.web_check(By.ID, 'site-search-submit', Is.CLICKABLE)
|
||||
submit_button = await self.web_find(By.ID, 'site-search-submit')
|
||||
await submit_button.click()
|
||||
else:
|
||||
await self.web_open(str(id_or_url)) # navigate to URL directly given
|
||||
await self.web_sleep()
|
||||
|
||||
# handle the case that invalid ad ID given
|
||||
if self.page.url.endswith('k0'):
|
||||
LOG.error('There is no ad under the given ID.')
|
||||
return False
|
||||
|
||||
# close (warning) popup, if given
|
||||
try:
|
||||
await self.web_find(By.ID, 'vap-ovrly-secure')
|
||||
LOG.warning('A popup appeared.')
|
||||
await self.web_click(By.CLASS_NAME, 'mfp-close')
|
||||
await self.web_sleep()
|
||||
except TimeoutError:
|
||||
pass
|
||||
return True
|
||||
|
||||
async def _extract_ad_page_info(self, directory:str, ad_id:int) -> dict[str, Any]:
|
||||
"""
|
||||
Extracts all necessary information from an ad´s page.
|
||||
|
||||
:param directory: the path of the ad´s previously created directory
|
||||
:param ad_id: the ad ID, already extracted by a calling function
|
||||
:return: a dictionary with the keys as given in an ad YAML, and their respective values
|
||||
"""
|
||||
info:dict[str, Any] = {'active': True}
|
||||
|
||||
# extract basic info
|
||||
info['type'] = 'OFFER' if 's-anzeige' in self.page.url else 'WANTED'
|
||||
title:str = await self.web_text(By.ID, 'viewad-title')
|
||||
LOG.info('Extracting information from ad with title \"%s\"', title)
|
||||
info['title'] = title
|
||||
|
||||
descr:str = await self.web_text(By.ID, 'viewad-description-text')
|
||||
info['description'] = descr
|
||||
|
||||
# extract category
|
||||
info['category'] = await self._extract_category_from_ad_page()
|
||||
|
||||
# get special attributes
|
||||
info['special_attributes'] = await self._extract_special_attributes_from_ad_page()
|
||||
|
||||
# process pricing
|
||||
info['price'], info['price_type'] = await self._extract_pricing_info_from_ad_page()
|
||||
|
||||
# process shipping
|
||||
info['shipping_type'], info['shipping_costs'], info['shipping_options'] = await self._extract_shipping_info_from_ad_page()
|
||||
info['sell_directly'] = await self._extract_sell_directly_from_ad_page()
|
||||
|
||||
# fetch images
|
||||
info['images'] = await self._download_images_from_ad_page(directory, ad_id)
|
||||
|
||||
# process address
|
||||
info['contact'] = await self._extract_contact_from_ad_page()
|
||||
|
||||
# process meta info
|
||||
info['republication_interval'] = 7 # a default value for downloaded ads
|
||||
info['id'] = ad_id
|
||||
|
||||
try: # try different locations known for creation date element
|
||||
creation_date = await self.web_text(By.XPATH,
|
||||
'/html/body/div[1]/div[2]/div/section[2]/section/section/article/div[3]/div[2]/div[2]/div[1]/span')
|
||||
except TimeoutError:
|
||||
creation_date = await self.web_text(By.CSS_SELECTOR, '#viewad-extra-info > div:nth-child(1) > span:nth-child(2)')
|
||||
|
||||
# convert creation date to ISO format
|
||||
created_parts = creation_date.split('.')
|
||||
creation_date = created_parts[2] + '-' + created_parts[1] + '-' + created_parts[0] + ' 00:00:00'
|
||||
creation_date = datetime.fromisoformat(creation_date).isoformat()
|
||||
info['created_on'] = creation_date
|
||||
info['updated_on'] = None # will be set later on
|
||||
|
||||
return info
|
||||
|
||||
async def _extract_category_from_ad_page(self) -> str:
|
||||
"""
|
||||
Extracts a category of an ad in numerical form.
|
||||
Assumes that the web driver currently shows an ad page.
|
||||
|
||||
:return: a category string of form abc/def, where a-f are digits
|
||||
"""
|
||||
category_line = self.webdriver.find_element(By.XPATH, '//*[@id="vap-brdcrmb"]')
|
||||
category_first_part = category_line.find_element(By.XPATH, './/a[2]')
|
||||
category_second_part = category_line.find_element(By.XPATH, './/a[3]')
|
||||
cat_num_first = category_first_part.get_attribute('href').split('/')[-1][1:]
|
||||
cat_num_second = category_second_part.get_attribute('href').split('/')[-1][1:]
|
||||
category_line = await self.web_find(By.ID, 'vap-brdcrmb')
|
||||
category_first_part = await self.web_find(By.CSS_SELECTOR, 'a:nth-of-type(2)', parent = category_line)
|
||||
category_second_part = await self.web_find(By.CSS_SELECTOR, 'a:nth-of-type(3)', parent = category_line)
|
||||
cat_num_first = category_first_part.attrs['href'].split('/')[-1][1:]
|
||||
cat_num_second = category_second_part.attrs['href'].split('/')[-1][1:]
|
||||
category:str = cat_num_first + '/' + cat_num_second
|
||||
|
||||
return category
|
||||
|
||||
def extract_special_attributes_from_ad_page(self) -> dict[str, Any]:
|
||||
async def _extract_special_attributes_from_ad_page(self) -> dict[str, Any]:
|
||||
"""
|
||||
Extracts the special attributes from an ad page.
|
||||
|
||||
:return: a dictionary (possibly empty) where the keys are the attribute names, mapped to their values
|
||||
"""
|
||||
belen_conf = self.webdriver.execute_script("return window.BelenConf")
|
||||
belen_conf = await self.web_execute("window.BelenConf")
|
||||
special_attributes_str = belen_conf["universalAnalyticsOpts"]["dimensions"]["dimension108"]
|
||||
special_attributes = json.loads(special_attributes_str)
|
||||
if not isinstance(special_attributes, dict):
|
||||
@@ -58,36 +298,32 @@ class AdExtractor(SeleniumMixin):
|
||||
special_attributes = {k: v for k, v in special_attributes.items() if not k.endswith('.versand_s')}
|
||||
return special_attributes
|
||||
|
||||
def extract_pricing_info_from_ad_page(self) -> tuple[float | None, str]:
|
||||
async def _extract_pricing_info_from_ad_page(self) -> tuple[float | None, str]:
|
||||
"""
|
||||
Extracts the pricing information (price and pricing type) from an ad page.
|
||||
|
||||
:return: the price of the offer (optional); and the pricing type
|
||||
"""
|
||||
try:
|
||||
price_str:str = self.webdriver.find_element(By.CLASS_NAME, 'boxedarticle--price').text
|
||||
price_type:str
|
||||
price:float | None = -1
|
||||
price_str:str = await self.web_text(By.ID, 'viewad-price')
|
||||
price:int | None = None
|
||||
match price_str.split()[-1]:
|
||||
case '€':
|
||||
price_type = 'FIXED'
|
||||
price = float(parse_decimal(price_str.split()[0].replace('.', '')))
|
||||
case 'VB': # can be either 'X € VB', or just 'VB'
|
||||
price = int(price_str.split()[0])
|
||||
case 'VB':
|
||||
price_type = 'NEGOTIABLE'
|
||||
try:
|
||||
price = float(parse_decimal(price_str.split()[0].replace('.', '')))
|
||||
except DecimalException:
|
||||
price = None
|
||||
if not price_str == "VB": # can be either 'X € VB', or just 'VB'
|
||||
price = int(price_str.split()[0])
|
||||
case 'verschenken':
|
||||
price_type = 'GIVE_AWAY'
|
||||
price = None
|
||||
case _:
|
||||
price_type = 'NOT_APPLICABLE'
|
||||
return price, price_type
|
||||
except NoSuchElementException: # no 'commercial' ad, has no pricing box etc.
|
||||
except TimeoutError: # no 'commercial' ad, has no pricing box etc.
|
||||
return None, 'NOT_APPLICABLE'
|
||||
|
||||
def extract_shipping_info_from_ad_page(self) -> tuple[str, float | None, list[str] | None]:
|
||||
async def _extract_shipping_info_from_ad_page(self) -> tuple[str, float | None, list[str] | None]:
|
||||
"""
|
||||
Extracts shipping information from an ad page.
|
||||
|
||||
@@ -95,8 +331,7 @@ class AdExtractor(SeleniumMixin):
|
||||
"""
|
||||
ship_type, ship_costs, shipping_options = 'NOT_APPLICABLE', None, None
|
||||
try:
|
||||
shipping_text = self.webdriver.find_element(By.CSS_SELECTOR, '.boxedarticle--details--shipping') \
|
||||
.text.strip()
|
||||
shipping_text = await self.web_text(By.ID, 'boxedarticle--details--shipping')
|
||||
# e.g. '+ Versand ab 5,49 €' OR 'Nur Abholung'
|
||||
if shipping_text == 'Nur Abholung':
|
||||
ship_type = 'PICKUP'
|
||||
@@ -124,115 +359,58 @@ class AdExtractor(SeleniumMixin):
|
||||
if shipping_price in shipping_text:
|
||||
shipping_options = [shipping_option]
|
||||
break
|
||||
except NoSuchElementException: # no pricing box -> no shipping given
|
||||
except TimeoutError: # no pricing box -> no shipping given
|
||||
ship_type = 'NOT_APPLICABLE'
|
||||
|
||||
return ship_type, ship_costs, shipping_options
|
||||
|
||||
def extract_sell_directly_from_ad_page(self) -> bool | None:
|
||||
async def _extract_sell_directly_from_ad_page(self) -> bool | None:
|
||||
"""
|
||||
Extracts the sell directly option from an ad page.
|
||||
|
||||
:return: a boolean indicating whether the sell directly option is active (optional)
|
||||
"""
|
||||
try:
|
||||
buy_now_is_active = self.webdriver.find_element(By.ID, 'j-buy-now').text == "Direkt kaufen"
|
||||
buy_now_is_active:bool = (await self.web_text(By.ID, 'j-buy-now')) == "Direkt kaufen"
|
||||
return buy_now_is_active
|
||||
except NoSuchElementException:
|
||||
except TimeoutError:
|
||||
return None
|
||||
|
||||
def extract_contact_from_ad_page(self) -> dict[str, (str | None)]:
|
||||
async def _extract_contact_from_ad_page(self) -> dict[str, (str | None)]:
|
||||
"""
|
||||
Processes the address part involving street (optional), zip code + city, and phone number (optional).
|
||||
|
||||
:return: a dictionary containing the address parts with their corresponding values
|
||||
"""
|
||||
contact:dict[str, (str | None)] = {}
|
||||
address_element = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-locality')
|
||||
address_text = address_element.text.strip()
|
||||
address_text = await self.web_text(By.ID, 'viewad-locality')
|
||||
# format: e.g. (Beispiel Allee 42,) 12345 Bundesland - Stadt
|
||||
try:
|
||||
street_element = self.webdriver.find_element(By.XPATH, '//*[@id="street-address"]')
|
||||
street = street_element.text[:-2] # trailing comma and whitespace
|
||||
street = (await self.web_text(By.ID, 'street-address'))[:-1] # trailing comma
|
||||
contact['street'] = street
|
||||
except NoSuchElementException:
|
||||
print('No street given in the contact.')
|
||||
except TimeoutError:
|
||||
LOG.info('No street given in the contact.')
|
||||
# construct remaining address
|
||||
address_halves = address_text.split(' - ')
|
||||
address_left_parts = address_halves[0].split(' ') # zip code and region/city
|
||||
contact['zipcode'] = address_left_parts[0]
|
||||
|
||||
contact_person_element = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-contact')
|
||||
name_element = contact_person_element.find_element(By.CLASS_NAME, 'iconlist-text')
|
||||
contact_person_element:Element = await self.web_find(By.ID, 'viewad-contact')
|
||||
name_element = await self.web_find(By.CLASS_NAME, 'iconlist-text', parent = contact_person_element)
|
||||
try:
|
||||
name = name_element.find_element(By.TAG_NAME, 'a').text
|
||||
except NoSuchElementException: # edge case: name without link
|
||||
name = name_element.find_element(By.TAG_NAME, 'span').text
|
||||
name = await self.web_text(By.TAG_NAME, 'a', parent = name_element)
|
||||
except TimeoutError: # edge case: name without link
|
||||
name = await self.web_text(By.TAG_NAME, 'span', parent = name_element)
|
||||
contact['name'] = name
|
||||
|
||||
if 'street' not in contact:
|
||||
contact['street'] = None
|
||||
try: # phone number is unusual for non-professional sellers today
|
||||
phone_element = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-contact-phone')
|
||||
phone_number = phone_element.find_element(By.TAG_NAME, 'a').text
|
||||
phone_element = await self.web_find(By.ID, 'viewad-contact-phone')
|
||||
phone_number = await self.web_text(By.TAG_NAME, 'a', parent = phone_element)
|
||||
contact['phone'] = ''.join(phone_number.replace('-', ' ').split(' ')).replace('+49(0)', '0')
|
||||
except NoSuchElementException:
|
||||
except TimeoutError:
|
||||
contact['phone'] = None # phone seems to be a deprecated feature (for non-professional users)
|
||||
# also see 'https://themen.kleinanzeigen.de/hilfe/deine-anzeigen/Telefon/
|
||||
|
||||
return contact
|
||||
|
||||
def extract_own_ads_references(self) -> list[str]:
|
||||
"""
|
||||
Extracts the references to all own ads.
|
||||
|
||||
:return: the links to your ad pages
|
||||
"""
|
||||
# navigate to your ads page
|
||||
self.webdriver.get('https://www.kleinanzeigen.de/m-meine-anzeigen.html')
|
||||
self.web_await(EC.url_contains('meine-anzeigen'), 15)
|
||||
pause(2000, 3000)
|
||||
|
||||
# collect ad references:
|
||||
|
||||
pagination_section = self.webdriver.find_element(By.CSS_SELECTOR, '.l-splitpage')\
|
||||
.find_element(By.XPATH, './/section[4]')
|
||||
# scroll down to load dynamically
|
||||
self.web_scroll_page_down()
|
||||
pause(2000, 3000)
|
||||
# detect multi-page
|
||||
try:
|
||||
pagination = pagination_section.find_element(By.XPATH, './/div/div[2]/div[2]/div') # Pagination
|
||||
except NoSuchElementException: # 0 ads - no pagination area
|
||||
print('There currently seem to be no ads on your profile!')
|
||||
return []
|
||||
|
||||
n_buttons = len(pagination.find_element(By.XPATH, './/div[1]').find_elements(By.TAG_NAME, 'button'))
|
||||
multi_page:bool
|
||||
if n_buttons > 1:
|
||||
multi_page = True
|
||||
print('It seems like you have many ads!')
|
||||
else:
|
||||
multi_page = False
|
||||
print('It seems like all your ads fit on one overview page.')
|
||||
|
||||
refs:list[str] = []
|
||||
while True: # loop reference extraction until no more forward page
|
||||
# extract references
|
||||
list_section = self.webdriver.find_element(By.XPATH, '//*[@id="my-manageads-adlist"]')
|
||||
list_items = list_section.find_elements(By.CLASS_NAME, 'cardbox')
|
||||
refs += [li.find_element(By.XPATH, 'article/section/section[2]/h2/div/a').get_attribute('href') for li in list_items]
|
||||
|
||||
if not multi_page: # only one iteration for single-page overview
|
||||
break
|
||||
# check if last page
|
||||
nav_button = self.webdriver.find_elements(By.CSS_SELECTOR, 'button.jsx-2828608826')[-1]
|
||||
if nav_button.get_attribute('title') != 'Nächste':
|
||||
print('Last ad overview page explored.')
|
||||
break
|
||||
# navigate to next overview page
|
||||
nav_button.click()
|
||||
pause(2000, 3000)
|
||||
self.web_scroll_page_down()
|
||||
|
||||
return refs
|
||||
|
||||
@@ -27,12 +27,7 @@ categories: []
|
||||
# browser configuration
|
||||
browser:
|
||||
# https://peter.sh/experiments/chromium-command-line-switches/
|
||||
arguments:
|
||||
# https://stackoverflow.com/a/50725918/5116073
|
||||
- --disable-dev-shm-usage
|
||||
- --no-sandbox
|
||||
# --headless
|
||||
# --start-maximized
|
||||
arguments: []
|
||||
binary_location: # path to custom browser executable, if not specified will be looked up on PATH
|
||||
extensions: [] # a list of .crx extension files to be loaded
|
||||
use_private_window: true
|
||||
|
||||
@@ -1,322 +0,0 @@
|
||||
"""
|
||||
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import logging, os, platform, shutil, time
|
||||
from collections.abc import Callable, Iterable
|
||||
from typing import Any, Final, TypeVar
|
||||
|
||||
from selenium import webdriver
|
||||
from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.chromium.options import ChromiumOptions
|
||||
from selenium.webdriver.chromium.webdriver import ChromiumDriver
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.webdriver.remote.webelement import WebElement
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.support.ui import Select, WebDriverWait
|
||||
import selenium_stealth
|
||||
from .utils import ensure, pause, T
|
||||
|
||||
LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.selenium_mixin")
|
||||
|
||||
|
||||
class BrowserConfig:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.arguments:Iterable[str] = []
|
||||
self.binary_location:str | None = None
|
||||
self.extensions:Iterable[str] = []
|
||||
self.use_private_window:bool = True
|
||||
self.user_data_dir:str = ""
|
||||
self.profile_name:str = ""
|
||||
|
||||
|
||||
CHROMIUM_OPTIONS = TypeVar('CHROMIUM_OPTIONS', bound = ChromiumOptions) # pylint: disable=invalid-name
|
||||
|
||||
|
||||
class SeleniumMixin:
|
||||
|
||||
def __init__(self) -> None:
|
||||
os.environ["SE_AVOID_STATS"] = "true" # see https://www.selenium.dev/documentation/selenium_manager/
|
||||
self.browser_config:Final[BrowserConfig] = BrowserConfig()
|
||||
self.webdriver:WebDriver = None
|
||||
|
||||
def _init_browser_options(self, browser_options:CHROMIUM_OPTIONS) -> CHROMIUM_OPTIONS:
|
||||
if self.browser_config.use_private_window:
|
||||
if isinstance(browser_options, webdriver.EdgeOptions):
|
||||
browser_options.add_argument("-inprivate")
|
||||
else:
|
||||
browser_options.add_argument("--incognito")
|
||||
|
||||
if self.browser_config.user_data_dir:
|
||||
LOG.info(" -> Browser User Data Dir: %s", self.browser_config.user_data_dir)
|
||||
browser_options.add_argument(f"--user-data-dir={self.browser_config.user_data_dir}")
|
||||
|
||||
if self.browser_config.profile_name:
|
||||
LOG.info(" -> Browser Profile Name: %s", self.browser_config.profile_name)
|
||||
browser_options.add_argument(f"--profile-directory={self.browser_config.profile_name}")
|
||||
|
||||
browser_options.add_argument("--disable-crash-reporter")
|
||||
browser_options.add_argument("--no-first-run")
|
||||
browser_options.add_argument("--no-service-autorun")
|
||||
for chrome_option in self.browser_config.arguments:
|
||||
LOG.info(" -> Custom chrome argument: %s", chrome_option)
|
||||
browser_options.add_argument(chrome_option)
|
||||
LOG.debug("Effective browser arguments: %s", browser_options.arguments)
|
||||
|
||||
for crx_extension in self.browser_config.extensions:
|
||||
ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
|
||||
browser_options.add_extension(crx_extension)
|
||||
LOG.debug("Effective browser extensions: %s", browser_options.extensions)
|
||||
|
||||
browser_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||||
browser_options.add_experimental_option("useAutomationExtension", False)
|
||||
browser_options.add_experimental_option("prefs", {
|
||||
"credentials_enable_service": False,
|
||||
"profile.password_manager_enabled": False,
|
||||
"profile.default_content_setting_values.notifications": 2, # 1 = allow, 2 = block browser notifications
|
||||
"devtools.preferences.currentDockState": "\"bottom\""
|
||||
})
|
||||
|
||||
if not LOG.isEnabledFor(logging.DEBUG):
|
||||
browser_options.add_argument("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3
|
||||
|
||||
LOG.debug("Effective experimental options: %s", browser_options.experimental_options)
|
||||
|
||||
if self.browser_config.binary_location:
|
||||
browser_options.binary_location = self.browser_config.binary_location
|
||||
LOG.info(" -> Chrome binary location: %s", self.browser_config.binary_location)
|
||||
return browser_options
|
||||
|
||||
def create_webdriver_session(self) -> None:
|
||||
LOG.info("Creating WebDriver session...")
|
||||
|
||||
if self.browser_config.binary_location:
|
||||
ensure(os.path.exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.")
|
||||
else:
|
||||
self.browser_config.binary_location = self.get_compatible_browser()
|
||||
|
||||
if "edge" in self.browser_config.binary_location.lower():
|
||||
os.environ["MSEDGEDRIVER_TELEMETRY_OPTOUT"] = "1" # https://docs.microsoft.com/en-us/microsoft-edge/privacy-whitepaper/#microsoft-edge-driver
|
||||
browser_options = self._init_browser_options(webdriver.EdgeOptions())
|
||||
browser_options.binary_location = self.browser_config.binary_location
|
||||
self.webdriver = webdriver.Edge(options = browser_options)
|
||||
else:
|
||||
browser_options = self._init_browser_options(webdriver.ChromeOptions())
|
||||
browser_options.binary_location = self.browser_config.binary_location
|
||||
self.webdriver = webdriver.Chrome(options = browser_options)
|
||||
|
||||
LOG.info(" -> Chrome driver: %s", self.webdriver.service.path)
|
||||
|
||||
# workaround to support Edge, see https://github.com/diprajpatra/selenium-stealth/pull/25
|
||||
selenium_stealth.Driver = ChromiumDriver
|
||||
|
||||
selenium_stealth.stealth(self.webdriver, # https://github.com/diprajpatra/selenium-stealth#args
|
||||
languages = ("de-DE", "de", "en-US", "en"),
|
||||
platform = "Win32",
|
||||
fix_hairline = True,
|
||||
)
|
||||
|
||||
LOG.info("New WebDriver session is: %s %s", self.webdriver.session_id, self.webdriver.command_executor._url) # pylint: disable=protected-access
|
||||
|
||||
def get_compatible_browser(self) -> str | None:
|
||||
match platform.system():
|
||||
case "Linux":
|
||||
browser_paths = [
|
||||
shutil.which("chromium"),
|
||||
shutil.which("chromium-browser"),
|
||||
shutil.which("google-chrome"),
|
||||
shutil.which("microsoft-edge")
|
||||
]
|
||||
|
||||
case "Darwin":
|
||||
browser_paths = [
|
||||
"/Applications/Chromium.app/Contents/MacOS/Chromium",
|
||||
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
|
||||
"/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
|
||||
]
|
||||
|
||||
case "Windows":
|
||||
browser_paths = [
|
||||
os.environ.get("ProgramFiles", "C:\\Program Files") + r'\Microsoft\Edge\Application\msedge.exe',
|
||||
os.environ.get("ProgramFiles(x86)", "C:\\Program Files (x86)") + r'\Microsoft\Edge\Application\msedge.exe',
|
||||
|
||||
os.environ["ProgramFiles"] + r'\Chromium\Application\chrome.exe',
|
||||
os.environ["ProgramFiles(x86)"] + r'\Chromium\Application\chrome.exe',
|
||||
os.environ["LOCALAPPDATA"] + r'\Chromium\Application\chrome.exe',
|
||||
|
||||
os.environ["ProgramFiles"] + r'\Chrome\Application\chrome.exe',
|
||||
os.environ["ProgramFiles(x86)"] + r'\Chrome\Application\chrome.exe',
|
||||
os.environ["LOCALAPPDATA"] + r'\Chrome\Application\chrome.exe',
|
||||
|
||||
shutil.which("msedge.exe"),
|
||||
shutil.which("chromium.exe"),
|
||||
shutil.which("chrome.exe")
|
||||
]
|
||||
|
||||
case _ as os_name:
|
||||
LOG.warning("Installed browser for OS [%s] could not be detected", os_name)
|
||||
return None
|
||||
|
||||
for browser_path in browser_paths:
|
||||
if browser_path and os.path.isfile(browser_path):
|
||||
return browser_path
|
||||
|
||||
raise AssertionError("Installed browser could not be detected")
|
||||
|
||||
def web_await(self, condition: Callable[[WebDriver], T], timeout:float = 5, exception_on_timeout: Callable[[], Exception] | None = None) -> T:
|
||||
"""
|
||||
Blocks/waits until the given condition is met.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutException: if element could not be found within time
|
||||
"""
|
||||
max_attempts = 2
|
||||
for attempt in range(max_attempts + 1)[1:]:
|
||||
try:
|
||||
return WebDriverWait(self.webdriver, timeout).until(condition) # type: ignore[no-any-return]
|
||||
except TimeoutException as ex:
|
||||
if exception_on_timeout:
|
||||
raise exception_on_timeout() from ex
|
||||
raise ex
|
||||
except WebDriverException as ex:
|
||||
# temporary workaround for:
|
||||
# - https://groups.google.com/g/chromedriver-users/c/Z_CaHJTJnLw
|
||||
# - https://bugs.chromium.org/p/chromedriver/issues/detail?id=4048
|
||||
if ex.msg == "target frame detached" and attempt < max_attempts:
|
||||
LOG.warning(ex)
|
||||
else:
|
||||
raise ex
|
||||
|
||||
raise AssertionError("Should never be reached.")
|
||||
|
||||
def web_click(self, selector_type:By, selector_value:str, timeout:float = 5) -> WebElement:
|
||||
"""
|
||||
:param timeout: timeout in seconds
|
||||
:raises NoSuchElementException: if element could not be found within time
|
||||
"""
|
||||
elem = self.web_await(
|
||||
EC.element_to_be_clickable((selector_type, selector_value)),
|
||||
timeout,
|
||||
lambda: NoSuchElementException(f"Element {selector_type}:{selector_value} not found or not clickable")
|
||||
)
|
||||
elem.click()
|
||||
pause()
|
||||
return elem
|
||||
|
||||
def web_execute(self, javascript:str) -> Any:
|
||||
"""
|
||||
Executes the given JavaScript code in the context of the current page.
|
||||
|
||||
:return: The command's JSON response
|
||||
"""
|
||||
return self.webdriver.execute_script(javascript)
|
||||
|
||||
def web_find(self, selector_type:By, selector_value:str, timeout:float = 5) -> WebElement:
|
||||
"""
|
||||
Locates an HTML element.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises NoSuchElementException: if element could not be found within time
|
||||
"""
|
||||
return self.web_await(
|
||||
EC.presence_of_element_located((selector_type, selector_value)),
|
||||
timeout,
|
||||
lambda: NoSuchElementException(f"Element {selector_type}='{selector_value}' not found")
|
||||
)
|
||||
|
||||
def web_input(self, selector_type:By, selector_value:str, text:str, timeout:float = 5) -> WebElement:
|
||||
"""
|
||||
Enters text into an HTML input field.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises NoSuchElementException: if element could not be found within time
|
||||
"""
|
||||
input_field = self.web_find(selector_type, selector_value, timeout)
|
||||
input_field.clear()
|
||||
input_field.send_keys(text)
|
||||
pause()
|
||||
return input_field
|
||||
|
||||
def web_open(self, url:str, timeout:float = 15, reload_if_already_open:bool = False) -> None:
|
||||
"""
|
||||
:param url: url to open in browser
|
||||
:param timeout: timespan in seconds within the page needs to be loaded
|
||||
:param reload_if_already_open: if False does nothing if the URL is already open in the browser
|
||||
:raises TimeoutException: if page did not open within given timespan
|
||||
"""
|
||||
LOG.debug(" -> Opening [%s]...", url)
|
||||
if not reload_if_already_open and url == self.webdriver.current_url:
|
||||
LOG.debug(" => skipping, [%s] is already open", url)
|
||||
return
|
||||
self.webdriver.get(url)
|
||||
WebDriverWait(self.webdriver, timeout).until(lambda _: self.web_execute("return document.readyState") == "complete")
|
||||
|
||||
# pylint: disable=dangerous-default-value
|
||||
def web_request(self, url:str, method:str = "GET", valid_response_codes:Iterable[int] = [200], headers:dict[str, str] | None = None) -> dict[str, Any]:
|
||||
method = method.upper()
|
||||
LOG.debug(" -> HTTP %s [%s]...", method, url)
|
||||
response:dict[str, Any] = self.webdriver.execute_async_script(f"""
|
||||
var callback = arguments[arguments.length - 1];
|
||||
fetch("{url}", {{
|
||||
method: "{method}",
|
||||
redirect: "follow",
|
||||
headers: {headers or {}}
|
||||
}})
|
||||
.then(response => response.text().then(responseText => {{
|
||||
headers = {{}};
|
||||
response.headers.forEach((v, k) => headers[k] = v);
|
||||
callback({{
|
||||
"statusCode": response.status,
|
||||
"statusMessage": response.statusText,
|
||||
"headers": headers,
|
||||
"content": responseText
|
||||
}})
|
||||
}}))
|
||||
""")
|
||||
ensure(
|
||||
response["statusCode"] in valid_response_codes,
|
||||
f'Invalid response "{response["statusCode"]} response["statusMessage"]" received for HTTP {method} to {url}'
|
||||
)
|
||||
return response
|
||||
# pylint: enable=dangerous-default-value
|
||||
|
||||
def web_scroll_page_down(self, scroll_length: int = 10, scroll_speed: int = 10000, scroll_back_top: bool = False) -> None:
|
||||
"""
|
||||
Smoothly scrolls the current web page down.
|
||||
|
||||
:param scroll_length: the length of a single scroll iteration, determines smoothness of scrolling, lower is smoother
|
||||
:param scroll_speed: the speed of scrolling, higher is faster
|
||||
:param scroll_back_top: whether to scroll the page back to the top after scrolling to the bottom
|
||||
"""
|
||||
current_y_pos = 0
|
||||
bottom_y_pos: int = self.webdriver.execute_script('return document.body.scrollHeight;') # get bottom position by JS
|
||||
while current_y_pos < bottom_y_pos: # scroll in steps until bottom reached
|
||||
current_y_pos += scroll_length
|
||||
self.webdriver.execute_script(f'window.scrollTo(0, {current_y_pos});') # scroll one step
|
||||
time.sleep(scroll_length / scroll_speed)
|
||||
|
||||
if scroll_back_top: # scroll back to top in same style
|
||||
while current_y_pos > 0:
|
||||
current_y_pos -= scroll_length
|
||||
self.webdriver.execute_script(f'window.scrollTo(0, {current_y_pos});')
|
||||
time.sleep(scroll_length / scroll_speed / 2) # double speed
|
||||
|
||||
def web_select(self, selector_type:By, selector_value:str, selected_value:Any, timeout:float = 5) -> WebElement:
|
||||
"""
|
||||
Selects an <option/> of a <select/> HTML element.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises NoSuchElementException: if element could not be found within time
|
||||
:raises UnexpectedTagNameException: if element is not a <select> element
|
||||
"""
|
||||
elem = self.web_await(
|
||||
EC.element_to_be_clickable((selector_type, selector_value)),
|
||||
timeout,
|
||||
lambda: NoSuchElementException(f"Element {selector_type}='{selector_value}' not found or not clickable")
|
||||
)
|
||||
Select(elem).select_by_value(selected_value)
|
||||
pause()
|
||||
return elem
|
||||
@@ -3,7 +3,7 @@ SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import copy, decimal, json, logging, os, re, secrets, sys, traceback, time
|
||||
import asyncio, copy, decimal, json, logging, os, re, sys, traceback, time
|
||||
from importlib.resources import read_text as get_resource_as_string
|
||||
from collections.abc import Callable, Sized
|
||||
from datetime import datetime
|
||||
@@ -68,6 +68,18 @@ def is_frozen() -> bool:
|
||||
return getattr(sys, "frozen", False)
|
||||
|
||||
|
||||
def is_integer(obj:Any) -> bool:
|
||||
try:
|
||||
int(obj)
|
||||
return True
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
|
||||
|
||||
async def ainput(prompt: str) -> str:
|
||||
return await asyncio.to_thread(input, f'{prompt} ')
|
||||
|
||||
|
||||
def apply_defaults(
|
||||
target:dict[Any, Any],
|
||||
defaults:dict[Any, Any],
|
||||
@@ -119,7 +131,7 @@ def configure_console_logging() -> None:
|
||||
stdout_log = logging.StreamHandler(sys.stderr)
|
||||
stdout_log.setLevel(logging.DEBUG)
|
||||
stdout_log.setFormatter(coloredlogs.ColoredFormatter("[%(levelname)s] %(message)s"))
|
||||
stdout_log.addFilter(type("", (logging.Filter,), {
|
||||
stdout_log.addFilter(type("", (logging.Filter,), { # pyright: ignore
|
||||
"filter": lambda rec: rec.levelno <= logging.INFO
|
||||
}))
|
||||
LOG_ROOT.addHandler(stdout_log)
|
||||
@@ -151,12 +163,6 @@ def on_sigint(_sig:int, _frame:FrameType | None) -> None:
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def pause(min_ms:int = 200, max_ms:int = 2000) -> None:
|
||||
duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms
|
||||
LOG.log(logging.INFO if duration > 1500 else logging.DEBUG, " ... pausing for %d ms ...", duration)
|
||||
time.sleep(duration / 1000)
|
||||
|
||||
|
||||
def pluralize(noun:str, count:int | Sized, prefix_with_count:bool = True) -> str:
|
||||
"""
|
||||
>>> pluralize("field", 1)
|
||||
@@ -272,20 +278,3 @@ def parse_datetime(date:datetime | str | None) -> datetime | None:
|
||||
if isinstance(date, datetime):
|
||||
return date
|
||||
return datetime.fromisoformat(date)
|
||||
|
||||
|
||||
def extract_ad_id_from_ad_link(url: str) -> int:
|
||||
"""
|
||||
Extracts the ID of an ad, given by its reference link.
|
||||
|
||||
:param url: the URL to the ad page
|
||||
:return: the ad ID, a (ten-digit) integer number
|
||||
"""
|
||||
num_part = url.split('/')[-1] # suffix
|
||||
id_part = num_part.split('-')[0]
|
||||
|
||||
try:
|
||||
return int(id_part)
|
||||
except ValueError:
|
||||
print('The ad ID could not be extracted from the given ad reference!')
|
||||
return -1
|
||||
|
||||
532
src/kleinanzeigen_bot/web_scraping_mixin.py
Normal file
532
src/kleinanzeigen_bot/web_scraping_mixin.py
Normal file
@@ -0,0 +1,532 @@
|
||||
"""
|
||||
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import asyncio, enum, inspect, json, logging, os, platform, secrets, shutil, time
|
||||
from collections.abc import Callable, Coroutine, Iterable
|
||||
from typing import cast, Any, Final
|
||||
|
||||
try:
|
||||
from typing import Never # type: ignore[attr-defined,unused-ignore] # mypy
|
||||
except ImportError:
|
||||
from typing import NoReturn as Never # Python <3.11
|
||||
|
||||
import nodriver, psutil
|
||||
from nodriver.core.browser import Browser
|
||||
from nodriver.core.config import Config
|
||||
from nodriver.core.element import Element
|
||||
from nodriver.core.tab import Tab as Page
|
||||
|
||||
from .utils import ensure, T
|
||||
|
||||
|
||||
LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.selenium_mixin")
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Browser",
|
||||
"BrowserConfig",
|
||||
"By",
|
||||
"Element",
|
||||
"Page",
|
||||
"Is",
|
||||
"WebScrapingMixin"
|
||||
]
|
||||
|
||||
|
||||
class By(enum.Enum):
|
||||
ID = enum.auto()
|
||||
CLASS_NAME = enum.auto()
|
||||
CSS_SELECTOR = enum.auto()
|
||||
TAG_NAME = enum.auto()
|
||||
TEXT = enum.auto()
|
||||
XPATH = enum.auto()
|
||||
|
||||
|
||||
class Is(enum.Enum):
|
||||
CLICKABLE = enum.auto()
|
||||
DISPLAYED = enum.auto()
|
||||
DISABLED = enum.auto()
|
||||
READONLY = enum.auto()
|
||||
SELECTED = enum.auto()
|
||||
|
||||
|
||||
class BrowserConfig:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.arguments:Iterable[str] = []
|
||||
self.binary_location:str | None = None
|
||||
self.extensions:Iterable[str] = []
|
||||
self.use_private_window:bool = True
|
||||
self.user_data_dir:str = ""
|
||||
self.profile_name:str = ""
|
||||
|
||||
|
||||
class WebScrapingMixin:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.browser_config:Final[BrowserConfig] = BrowserConfig()
|
||||
self.browser:Browser = None # pyright: ignore
|
||||
self.page:Page = None # pyright: ignore
|
||||
|
||||
async def create_browser_session(self) -> None:
|
||||
LOG.info("Creating Browser session...")
|
||||
|
||||
if self.browser_config.binary_location:
|
||||
ensure(os.path.exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.")
|
||||
else:
|
||||
self.browser_config.binary_location = self.get_compatible_browser()
|
||||
LOG.info(" -> Chrome binary location: %s", self.browser_config.binary_location)
|
||||
|
||||
# default_browser_args: @ https://github.com/ultrafunkamsterdam/nodriver/blob/main/nodriver/core/config.py
|
||||
# https://peter.sh/experiments/chromium-command-line-switches/
|
||||
# https://github.com/GoogleChrome/chrome-launcher/blob/main/docs/chrome-flags-for-tools.md
|
||||
browser_args = [
|
||||
# "--disable-dev-shm-usage", # https://stackoverflow.com/a/50725918/5116073
|
||||
"--disable-crash-reporter",
|
||||
"--disable-domain-reliability",
|
||||
"--disable-sync",
|
||||
"--no-experiments",
|
||||
|
||||
"--disable-features=MediaRouter",
|
||||
"--use-mock-keychain",
|
||||
|
||||
"--test-type", # https://stackoverflow.com/a/36746675/5116073
|
||||
# https://chromium.googlesource.com/chromium/src/+/master/net/dns/README.md#request-remapping
|
||||
'--host-resolver-rules="MAP connect.facebook.net 127.0.0.1, MAP securepubads.g.doubleclick.net 127.0.0.1, MAP www.googletagmanager.com 127.0.0.1"'
|
||||
]
|
||||
|
||||
is_edge = "edge" in self.browser_config.binary_location.lower()
|
||||
|
||||
if is_edge:
|
||||
os.environ["MSEDGEDRIVER_TELEMETRY_OPTOUT"] = "1" # https://docs.microsoft.com/en-us/microsoft-edge/privacy-whitepaper/#microsoft-edge-driver
|
||||
|
||||
if self.browser_config.use_private_window:
|
||||
browser_args.append("-inprivate" if is_edge else "--incognito")
|
||||
|
||||
if self.browser_config.profile_name:
|
||||
LOG.info(" -> Browser profile name: %s", self.browser_config.profile_name)
|
||||
browser_args.append(f"--profile-directory={self.browser_config.profile_name}")
|
||||
|
||||
for browser_arg in self.browser_config.arguments:
|
||||
LOG.info(" -> Custom Chrome argument: %s", browser_arg)
|
||||
browser_args.append(browser_arg)
|
||||
|
||||
if not LOG.isEnabledFor(logging.DEBUG):
|
||||
browser_args.append("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3
|
||||
|
||||
if self.browser_config.user_data_dir:
|
||||
LOG.info(" -> Browser user data dir: %s", self.browser_config.user_data_dir)
|
||||
|
||||
cfg = Config(
|
||||
headless = False,
|
||||
browser_executable_path = self.browser_config.binary_location,
|
||||
browser_args = browser_args,
|
||||
user_data_dir = self.browser_config.user_data_dir
|
||||
)
|
||||
# already logged by nodriver:
|
||||
# LOG.debug("-> Effective browser arguments: \n\t\t%s", "\n\t\t".join(cfg.browser_args))
|
||||
|
||||
profile_dir = os.path.join(cfg.user_data_dir, self.browser_config.profile_name or "Default")
|
||||
os.makedirs(profile_dir, exist_ok = True)
|
||||
prefs_file = os.path.join(profile_dir, "Preferences")
|
||||
if not os.path.exists(prefs_file):
|
||||
LOG.info("-> Setting chrome prefs [%s]...", prefs_file)
|
||||
with open(prefs_file, "w", encoding='UTF-8') as fd:
|
||||
json.dump({
|
||||
"credentials_enable_service": False,
|
||||
"enable_do_not_track": True,
|
||||
"google": {
|
||||
"services": {
|
||||
"consented_to_sync": False
|
||||
}
|
||||
},
|
||||
"profile": {
|
||||
"default_content_setting_values": {
|
||||
"popups": 0,
|
||||
"notifications": 2 # 1 = allow, 2 = block browser notifications
|
||||
},
|
||||
"password_manager_enabled": False
|
||||
},
|
||||
"signin": {
|
||||
"allowed": False
|
||||
},
|
||||
"translate_site_blacklist": [
|
||||
"www.kleinanzeigen.de"
|
||||
],
|
||||
"devtools": {
|
||||
"preferences": {
|
||||
"currentDockState": '"bottom"'
|
||||
}
|
||||
}
|
||||
}, fd)
|
||||
|
||||
# load extensions
|
||||
for crx_extension in self.browser_config.extensions:
|
||||
LOG.info(" -> Adding extension: [%s]", crx_extension)
|
||||
ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
|
||||
cfg.add_extension(crx_extension)
|
||||
|
||||
self.browser = await nodriver.start(cfg)
|
||||
LOG.info("New Browser session is %s", self.browser.websocket_url)
|
||||
|
||||
def close_browser_session(self) -> None:
|
||||
if self.browser:
|
||||
LOG.debug("Closing Browser session...")
|
||||
self.page = None # pyright: ignore
|
||||
browser_process = psutil.Process(self.browser._process_pid) # pylint: disable=protected-access
|
||||
browser_children:list[psutil.Process] = browser_process.children()
|
||||
self.browser.stop()
|
||||
for p in browser_children:
|
||||
if p.is_running():
|
||||
p.kill() # terminate orphaned browser processes
|
||||
self.browser = None # pyright: ignore
|
||||
|
||||
def get_compatible_browser(self) -> str:
|
||||
match platform.system():
|
||||
case "Linux":
|
||||
browser_paths = [
|
||||
shutil.which("chromium"),
|
||||
shutil.which("chromium-browser"),
|
||||
shutil.which("google-chrome"),
|
||||
shutil.which("microsoft-edge")
|
||||
]
|
||||
|
||||
case "Darwin":
|
||||
browser_paths = [
|
||||
"/Applications/Chromium.app/Contents/MacOS/Chromium",
|
||||
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
|
||||
"/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
|
||||
]
|
||||
|
||||
case "Windows":
|
||||
browser_paths = [
|
||||
os.environ.get("ProgramFiles", "C:\\Program Files") + r'\Microsoft\Edge\Application\msedge.exe',
|
||||
os.environ.get("ProgramFiles(x86)", "C:\\Program Files (x86)") + r'\Microsoft\Edge\Application\msedge.exe',
|
||||
|
||||
os.environ["ProgramFiles"] + r'\Chromium\Application\chrome.exe',
|
||||
os.environ["ProgramFiles(x86)"] + r'\Chromium\Application\chrome.exe',
|
||||
os.environ["LOCALAPPDATA"] + r'\Chromium\Application\chrome.exe',
|
||||
|
||||
os.environ["ProgramFiles"] + r'\Chrome\Application\chrome.exe',
|
||||
os.environ["ProgramFiles(x86)"] + r'\Chrome\Application\chrome.exe',
|
||||
os.environ["LOCALAPPDATA"] + r'\Chrome\Application\chrome.exe',
|
||||
|
||||
shutil.which("msedge.exe"),
|
||||
shutil.which("chromium.exe"),
|
||||
shutil.which("chrome.exe")
|
||||
]
|
||||
|
||||
case _ as os_name:
|
||||
raise AssertionError(f"Installed browser for OS [{os_name}] could not be detected")
|
||||
|
||||
for browser_path in browser_paths:
|
||||
if browser_path and os.path.isfile(browser_path):
|
||||
return browser_path
|
||||
|
||||
raise AssertionError("Installed browser could not be detected")
|
||||
|
||||
async def web_await(self, condition: Callable[[], T | Never | Coroutine[Any,Any,T | Never]], *,
|
||||
timeout:int | float = 5, timeout_error_message: str = "") -> T:
|
||||
"""
|
||||
Blocks/waits until the given condition is met.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
start_at = loop.time()
|
||||
|
||||
while True:
|
||||
await self.page
|
||||
ex:Exception | None = None
|
||||
try:
|
||||
result_raw = condition()
|
||||
result:T = (await result_raw) if inspect.isawaitable(result_raw) else result_raw
|
||||
if result:
|
||||
return result
|
||||
except Exception as ex1:
|
||||
ex = ex1
|
||||
if loop.time() - start_at > timeout:
|
||||
if ex:
|
||||
raise ex
|
||||
raise TimeoutError(timeout_error_message or f"Condition not met within {timeout} seconds")
|
||||
await self.page.sleep(0.5)
|
||||
|
||||
async def web_check(self, selector_type:By, selector_value:str, attr:Is, *, timeout:int | float = 5) -> bool:
|
||||
"""
|
||||
Locates an HTML element and returns a state.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
|
||||
def is_disabled(elem:Element) -> bool:
|
||||
return elem.attrs.get("disabled") is not None
|
||||
|
||||
async def is_displayed(elem:Element) -> bool:
|
||||
return cast(bool, await elem.apply("""
|
||||
function (element) {
|
||||
var style = window.getComputedStyle(element);
|
||||
return style.display !== 'none'
|
||||
&& style.visibility !== 'hidden'
|
||||
&& style.opacity !== '0'
|
||||
&& element.offsetWidth > 0
|
||||
&& element.offsetHeight > 0
|
||||
}
|
||||
"""))
|
||||
elem:Element = await self.web_find(selector_type, selector_value, timeout = timeout)
|
||||
|
||||
match attr:
|
||||
case Is.CLICKABLE:
|
||||
return not is_disabled(elem) or await is_displayed(elem)
|
||||
case Is.DISPLAYED:
|
||||
return await is_displayed(elem)
|
||||
case Is.DISABLED:
|
||||
return is_disabled(elem)
|
||||
case Is.READONLY:
|
||||
return elem.attrs.get("readonly") is not None
|
||||
case Is.SELECTED:
|
||||
return cast(bool, await elem.apply("""
|
||||
function (element) {
|
||||
if (element.tagName.toLowerCase() === 'input') {
|
||||
if (element.type === 'checkbox' || element.type === 'radio') {
|
||||
return element.checked
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
"""))
|
||||
raise AssertionError(f"Unsupported attribute: {attr}")
|
||||
|
||||
async def web_click(self, selector_type:By, selector_value:str, *, timeout:int | float = 5) -> Element:
|
||||
"""
|
||||
Locates an HTML element by ID.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
elem = await self.web_find(selector_type, selector_value, timeout = timeout)
|
||||
await elem.click()
|
||||
await self.web_sleep()
|
||||
return elem
|
||||
|
||||
async def web_execute(self, javascript:str) -> Any:
|
||||
"""
|
||||
Executes the given JavaScript code in the context of the current page.
|
||||
|
||||
:return: The javascript's return value
|
||||
"""
|
||||
return await self.page.evaluate(javascript, True)
|
||||
|
||||
async def web_find(self, selector_type:By, selector_value:str, *, parent:Element = None, timeout:int | float = 5) -> Element:
|
||||
"""
|
||||
Locates an HTML element by the given selector type and value.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
match selector_type:
|
||||
case By.ID:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector(f"#{selector_value}", parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found with ID '{selector_value}' within {timeout} seconds.")
|
||||
case By.CLASS_NAME:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector(f".{selector_value}", parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found with ID '{selector_value}' within {timeout} seconds.")
|
||||
case By.TAG_NAME:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found of tag <{selector_value}> within {timeout} seconds.")
|
||||
case By.CSS_SELECTOR:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found using CSS selector '{selector_value}' within {timeout} seconds.")
|
||||
case By.TEXT:
|
||||
if parent:
|
||||
raise AssertionError(f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_element_by_text(selector_value, True),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found containing text '{selector_value}' within {timeout} seconds.")
|
||||
case By.XPATH:
|
||||
if parent:
|
||||
raise AssertionError(f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_element_by_text(selector_value, True),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found using XPath '{selector_value}' within {timeout} seconds.")
|
||||
|
||||
raise AssertionError(f"Unsupported selector type: {selector_type}")
|
||||
|
||||
async def web_find_all(self, selector_type:By, selector_value:str, *, parent:Element = None, timeout:int | float = 5) -> list[Element]:
|
||||
"""
|
||||
Locates an HTML element by ID.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
match selector_type:
|
||||
case By.CLASS_NAME:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector_all(f".{selector_value}", parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found with CSS class '{selector_value}' within {timeout} seconds.")
|
||||
case By.CSS_SELECTOR:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector_all(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found using CSS selector '{selector_value}' within {timeout} seconds.")
|
||||
case By.TAG_NAME:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector_all(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found of tag <{selector_value}> within {timeout} seconds.")
|
||||
case By.TEXT:
|
||||
if parent:
|
||||
raise AssertionError(f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_elements_by_text(selector_value),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found containing text '{selector_value}' within {timeout} seconds.")
|
||||
case By.XPATH:
|
||||
if parent:
|
||||
raise AssertionError(f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_elements_by_text(selector_value),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found using XPath '{selector_value}' within {timeout} seconds.")
|
||||
|
||||
raise AssertionError(f"Unsupported selector type: {selector_type}")
|
||||
|
||||
async def web_input(self, selector_type:By, selector_value:str, text:str | int, *, timeout:int | float = 5) -> Element:
|
||||
"""
|
||||
Enters text into an HTML input field.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
input_field = await self.web_find(selector_type, selector_value, timeout = timeout)
|
||||
await input_field.clear_input()
|
||||
await input_field.send_keys(str(text))
|
||||
await self.web_sleep()
|
||||
return input_field
|
||||
|
||||
async def web_open(self, url:str, *, timeout:int | float = 15000, reload_if_already_open:bool = False) -> None:
|
||||
"""
|
||||
:param url: url to open in browser
|
||||
:param timeout: timespan in seconds within the page needs to be loaded
|
||||
:param reload_if_already_open: if False does nothing if the URL is already open in the browser
|
||||
:raises TimeoutException: if page did not open within given timespan
|
||||
"""
|
||||
LOG.debug(" -> Opening [%s]...", url)
|
||||
if not reload_if_already_open and self.page and url == self.page.url:
|
||||
LOG.debug(" => skipping, [%s] is already open", url)
|
||||
return
|
||||
self.page = await self.browser.get(url, False, False)
|
||||
await self.web_await(lambda: self.web_execute("document.readyState == 'complete'"), timeout = timeout,
|
||||
timeout_error_message = f"Page did not finish loading within {timeout} seconds.")
|
||||
|
||||
async def web_text(self, selector_type:By, selector_value:str, *, parent:Element = None, timeout:int | float = 5) -> str:
|
||||
return str(await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply("""
|
||||
function (elem) {
|
||||
let sel = window.getSelection()
|
||||
sel.removeAllRanges()
|
||||
let range = document.createRange()
|
||||
range.selectNode(elem)
|
||||
sel.addRange(range)
|
||||
let visibleText = sel.toString().trim()
|
||||
sel.removeAllRanges()
|
||||
return visibleText
|
||||
}
|
||||
"""))
|
||||
|
||||
async def web_sleep(self, min_ms:int = 1000, max_ms:int = 2500) -> None:
|
||||
duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms
|
||||
LOG.log(logging.INFO if duration > 1500 else logging.DEBUG, " ... pausing for %d ms ...", duration)
|
||||
await self.page.sleep(duration / 1000)
|
||||
|
||||
async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200,
|
||||
headers:dict[str, str] | None = None) -> dict[str, Any]:
|
||||
method = method.upper()
|
||||
LOG.debug(" -> HTTP %s [%s]...", method, url)
|
||||
response = cast(dict[str, Any], await self.page.evaluate(f"""
|
||||
fetch("{url}", {{
|
||||
method: "{method}",
|
||||
redirect: "follow",
|
||||
headers: {headers or {}}
|
||||
}})
|
||||
.then(response => response.text().then(responseText => {{
|
||||
headers = {{}};
|
||||
response.headers.forEach((v, k) => headers[k] = v);
|
||||
return {{
|
||||
statusCode: response.status,
|
||||
statusMessage: response.statusText,
|
||||
headers: headers,
|
||||
content: responseText
|
||||
}}
|
||||
}}))
|
||||
""", await_promise=True))
|
||||
if isinstance(valid_response_codes, int):
|
||||
valid_response_codes = [valid_response_codes]
|
||||
ensure(
|
||||
response["statusCode"] in valid_response_codes,
|
||||
f'Invalid response "{response["statusCode"]} response["statusMessage"]" received for HTTP {method} to {url}'
|
||||
)
|
||||
return response
|
||||
# pylint: enable=dangerous-default-value
|
||||
|
||||
async def web_scroll_page_down(self, scroll_length: int = 10, scroll_speed: int = 10000, scroll_back_top: bool = False) -> None:
|
||||
"""
|
||||
Smoothly scrolls the current web page down.
|
||||
|
||||
:param scroll_length: the length of a single scroll iteration, determines smoothness of scrolling, lower is smoother
|
||||
:param scroll_speed: the speed of scrolling, higher is faster
|
||||
:param scroll_back_top: whether to scroll the page back to the top after scrolling to the bottom
|
||||
"""
|
||||
current_y_pos = 0
|
||||
bottom_y_pos: int = await self.web_execute('document.body.scrollHeight') # get bottom position
|
||||
while current_y_pos < bottom_y_pos: # scroll in steps until bottom reached
|
||||
current_y_pos += scroll_length
|
||||
await self.web_execute(f'window.scrollTo(0, {current_y_pos})') # scroll one step
|
||||
time.sleep(scroll_length / scroll_speed)
|
||||
|
||||
if scroll_back_top: # scroll back to top in same style
|
||||
while current_y_pos > 0:
|
||||
current_y_pos -= scroll_length
|
||||
await self.web_execute(f'window.scrollTo(0, {current_y_pos})')
|
||||
time.sleep(scroll_length / scroll_speed / 2) # double speed
|
||||
|
||||
async def web_select(self, selector_type:By, selector_value:str, selected_value:Any, timeout:int | float = 5) -> Element:
|
||||
"""
|
||||
Selects an <option/> of a <select/> HTML element.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
:raises UnexpectedTagNameException: if element is not a <select> element
|
||||
"""
|
||||
await self.web_await(
|
||||
lambda: self.web_check(selector_type, selector_value, Is.CLICKABLE), timeout = timeout,
|
||||
timeout_error_message = f"No clickable HTML element with selector: {selector_type}='{selector_value}' found"
|
||||
)
|
||||
elem = await self.web_find(selector_type, selector_value)
|
||||
await elem.apply(f"""
|
||||
function (element) {{
|
||||
for(let i=0; i < element.options.length; i++)
|
||||
{{
|
||||
if(element.options[i].value == "{selected_value}") {{
|
||||
element.selectedIndex = i;
|
||||
break;
|
||||
}}
|
||||
}}
|
||||
throw new Error("Option with value {selected_value} not found.");
|
||||
}}
|
||||
""")
|
||||
await self.web_sleep()
|
||||
return elem
|
||||
Reference in New Issue
Block a user