refact: apply consistent formatting

This commit is contained in:
sebthom
2025-04-27 23:54:22 +02:00
parent fe33a0e461
commit ef923a8337
21 changed files with 1020 additions and 709 deletions

View File

@@ -83,11 +83,11 @@ class KleinanzeigenBot(WebScrapingMixin):
self.configure_file_logging()
self.load_config()
if not (self.ads_selector in {'all', 'new', 'due', 'changed'} or
any(selector in self.ads_selector.split(',') for selector in ('all', 'new', 'due', 'changed')) or
re.compile(r'\d+[,\d+]*').search(self.ads_selector)):
if not (self.ads_selector in {"all", "new", "due", "changed"} or
any(selector in self.ads_selector.split(",") for selector in ("all", "new", "due", "changed")) or
re.compile(r"\d+[,\d+]*").search(self.ads_selector)):
LOG.warning('You provided no ads selector. Defaulting to "due".')
self.ads_selector = 'due'
self.ads_selector = "due"
if ads := self.load_ads():
await self.create_browser_session()
@@ -111,9 +111,9 @@ class KleinanzeigenBot(WebScrapingMixin):
case "download":
self.configure_file_logging()
# ad IDs depends on selector
if not (self.ads_selector in {'all', 'new'} or re.compile(r'\d+[,\d+]*').search(self.ads_selector)):
if not (self.ads_selector in {"all", "new"} or re.compile(r"\d+[,\d+]*").search(self.ads_selector)):
LOG.warning('You provided no ads selector. Defaulting to "new".')
self.ads_selector = 'new'
self.ads_selector = "new"
self.load_config()
await self.create_browser_session()
await self.login()
@@ -265,7 +265,7 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info("App version: %s", self.get_version())
LOG.info("Python version: %s", sys.version)
def __check_ad_republication(self, ad_cfg: dict[str, Any], ad_file_relative: str) -> bool:
def __check_ad_republication(self, ad_cfg:dict[str, Any], ad_file_relative:str) -> bool:
"""
Check if an ad needs to be republished based on republication interval.
Returns True if the ad should be republished based on the interval.
@@ -295,7 +295,7 @@ class KleinanzeigenBot(WebScrapingMixin):
return True
def __check_ad_changed(self, ad_cfg: dict[str, Any], ad_cfg_orig: dict[str, Any], ad_file_relative: str) -> bool:
def __check_ad_changed(self, ad_cfg:dict[str, Any], ad_cfg_orig:dict[str, Any], ad_file_relative:str) -> bool:
"""
Check if an ad has been changed since last publication.
Returns True if the ad has been changed.
@@ -327,7 +327,7 @@ class KleinanzeigenBot(WebScrapingMixin):
data_root_dir = os.path.dirname(self.config_file_path)
for file_pattern in self.config["ad_files"]:
for ad_file in glob.glob(file_pattern, root_dir = data_root_dir, flags = glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB):
if not str(ad_file).endswith('ad_fields.yaml'):
if not str(ad_file).endswith("ad_fields.yaml"):
ad_files[abspath(ad_file, relative_to = data_root_dir)] = ad_file
LOG.info(" -> found %s", pluralize("ad config file", ad_files))
if not ad_files:
@@ -335,13 +335,13 @@ class KleinanzeigenBot(WebScrapingMixin):
ids = []
use_specific_ads = False
selectors = self.ads_selector.split(',')
selectors = self.ads_selector.split(",")
if re.compile(r'\d+[,\d+]*').search(self.ads_selector):
ids = [int(n) for n in self.ads_selector.split(',')]
if re.compile(r"\d+[,\d+]*").search(self.ads_selector):
ids = [int(n) for n in self.ads_selector.split(",")]
use_specific_ads = True
LOG.info('Start fetch task for the ad(s) with id(s):')
LOG.info(' | '.join([str(id_) for id_ in ids]))
LOG.info("Start fetch task for the ad(s) with id(s):")
LOG.info(" | ".join([str(id_) for id_ in ids]))
ad_fields = dicts.load_dict_from_module(resources, "ad_fields.yaml")
ads = []
@@ -548,7 +548,7 @@ class KleinanzeigenBot(WebScrapingMixin):
async def is_logged_in(self) -> bool:
try:
user_info = await self.web_text(By.CLASS_NAME, "mr-medium")
if self.config['login']['username'].lower() in user_info.lower():
if self.config["login"]["username"].lower() in user_info.lower():
return True
except TimeoutError:
return False
@@ -570,7 +570,7 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info("DONE: Deleted %s", pluralize("ad", count))
LOG.info("############################################")
async def delete_ad(self, ad_cfg: dict[str, Any], published_ads: list[dict[str, Any]], *, delete_old_ads_by_title: bool) -> bool:
async def delete_ad(self, ad_cfg:dict[str, Any], published_ads:list[dict[str, Any]], *, delete_old_ads_by_title:bool) -> bool:
LOG.info("Deleting ad '%s' if already present...", ad_cfg["title"])
await self.web_open(f"{self.root_url}/m-meine-anzeigen.html")
@@ -627,7 +627,7 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info("DONE: (Re-)published %s", pluralize("ad", count))
LOG.info("############################################")
async def publish_ad(self, ad_file:str, ad_cfg: dict[str, Any], ad_cfg_orig: dict[str, Any], published_ads: list[dict[str, Any]]) -> None:
async def publish_ad(self, ad_file:str, ad_cfg:dict[str, Any], ad_cfg_orig:dict[str, Any], published_ads:list[dict[str, Any]]) -> None:
"""
@param ad_cfg: the effective ad config (i.e. with default values applied etc.)
@param ad_cfg_orig: the ad config as present in the YAML file
@@ -657,7 +657,7 @@ class KleinanzeigenBot(WebScrapingMixin):
#############################
# set category
#############################
await self.__set_category(ad_cfg['category'], ad_file)
await self.__set_category(ad_cfg["category"], ad_file)
#############################
# set special attributes
@@ -674,7 +674,7 @@ class KleinanzeigenBot(WebScrapingMixin):
try:
await self.web_select(By.XPATH, "//select[contains(@id, '.versand_s')]", shipping_value)
except TimeoutError:
LOG.warning("Failed to set shipping attribute for type '%s'!", ad_cfg['shipping_type'])
LOG.warning("Failed to set shipping attribute for type '%s'!", ad_cfg["shipping_type"])
else:
await self.__set_shipping(ad_cfg)
@@ -698,9 +698,9 @@ class KleinanzeigenBot(WebScrapingMixin):
if ad_cfg["shipping_type"] == "SHIPPING":
if sell_directly and ad_cfg["shipping_options"] and price_type in {"FIXED", "NEGOTIABLE"}:
if not await self.web_check(By.ID, "radio-buy-now-yes", Is.SELECTED):
await self.web_click(By.ID, 'radio-buy-now-yes')
await self.web_click(By.ID, "radio-buy-now-yes")
elif not await self.web_check(By.ID, "radio-buy-now-no", Is.SELECTED):
await self.web_click(By.ID, 'radio-buy-now-no')
await self.web_click(By.ID, "radio-buy-now-no")
except TimeoutError as ex:
LOG.debug(ex, exc_info = True)
@@ -832,7 +832,7 @@ class KleinanzeigenBot(WebScrapingMixin):
dicts.save_dict(ad_file, ad_cfg_orig)
async def __set_condition(self, condition_value: str) -> None:
async def __set_condition(self, condition_value:str) -> None:
condition_mapping = {
"new_with_tag": "Neu mit Etikett",
"new": "Neu",
@@ -862,7 +862,7 @@ class KleinanzeigenBot(WebScrapingMixin):
except TimeoutError as ex:
raise TimeoutError(_("Unable to close condition dialog!")) from ex
async def __set_category(self, category: str | None, ad_file:str) -> None:
async def __set_category(self, category:str | None, ad_file:str) -> None:
# click on something to trigger automatic category detection
await self.web_click(By.ID, "pstad-descrptn")
@@ -884,9 +884,9 @@ class KleinanzeigenBot(WebScrapingMixin):
else:
ensure(is_category_auto_selected, f"No category specified in [{ad_file}] and automatic category detection failed")
async def __set_special_attributes(self, ad_cfg: dict[str, Any]) -> None:
async def __set_special_attributes(self, ad_cfg:dict[str, Any]) -> None:
if ad_cfg["special_attributes"]:
LOG.debug('Found %i special attributes', len(ad_cfg["special_attributes"]))
LOG.debug("Found %i special attributes", len(ad_cfg["special_attributes"]))
for special_attribute_key, special_attribute_value in ad_cfg["special_attributes"].items():
if special_attribute_key == "condition_s":
@@ -911,10 +911,10 @@ class KleinanzeigenBot(WebScrapingMixin):
try:
elem_id = special_attr_elem.attrs.id
if special_attr_elem.local_name == 'select':
if special_attr_elem.local_name == "select":
LOG.debug("Attribute field '%s' seems to be a select...", special_attribute_key)
await self.web_select(By.ID, elem_id, special_attribute_value)
elif special_attr_elem.attrs.type == 'checkbox':
elif special_attr_elem.attrs.type == "checkbox":
LOG.debug("Attribute field '%s' seems to be a checkbox...", special_attribute_key)
await self.web_click(By.ID, elem_id)
else:
@@ -925,7 +925,7 @@ class KleinanzeigenBot(WebScrapingMixin):
raise TimeoutError(f"Failed to set special attribute [{special_attribute_key}]") from ex
LOG.debug("Successfully set attribute field [%s] to [%s]...", special_attribute_key, special_attribute_value)
async def __set_shipping(self, ad_cfg: dict[str, Any]) -> None:
async def __set_shipping(self, ad_cfg:dict[str, Any]) -> None:
if ad_cfg["shipping_type"] == "PICKUP":
try:
await self.web_click(By.XPATH,
@@ -960,7 +960,7 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.debug(ex, exc_info = True)
raise TimeoutError(_("Unable to close shipping dialog!")) from ex
async def __set_shipping_options(self, ad_cfg: dict[str, Any]) -> None:
async def __set_shipping_options(self, ad_cfg:dict[str, Any]) -> None:
shipping_options_mapping = {
"DHL_2": ("Klein", "Paket 2 kg"),
"Hermes_Päckchen": ("Klein", "Päckchen"),
@@ -980,7 +980,7 @@ class KleinanzeigenBot(WebScrapingMixin):
except KeyError as ex:
raise KeyError(f"Unknown shipping option(s), please refer to the documentation/README: {ad_cfg['shipping_options']}") from ex
shipping_sizes, shipping_packages = zip(*mapped_shipping_options, strict=False)
shipping_sizes, shipping_packages = zip(*mapped_shipping_options, strict = False)
try:
shipping_size, = set(shipping_sizes)
@@ -1025,7 +1025,7 @@ class KleinanzeigenBot(WebScrapingMixin):
except TimeoutError as ex:
raise TimeoutError(_("Unable to close shipping dialog!")) from ex
async def __upload_images(self, ad_cfg: dict[str, Any]) -> None:
async def __upload_images(self, ad_cfg:dict[str, Any]) -> None:
LOG.info(" -> found %s", pluralize("image", ad_cfg["images"]))
image_upload:Element = await self.web_find(By.CSS_SELECTOR, "input[type=file]")
@@ -1036,7 +1036,7 @@ class KleinanzeigenBot(WebScrapingMixin):
async def assert_free_ad_limit_not_reached(self) -> None:
try:
await self.web_find(By.XPATH, '/html/body/div[1]/form/fieldset[6]/div[1]/header', timeout = 2)
await self.web_find(By.XPATH, "/html/body/div[1]/form/fieldset[6]/div[1]/header", timeout = 2)
raise AssertionError(f"Cannot publish more ads. The monthly limit of free ads of account {self.config['login']['username']} is reached.")
except TimeoutError:
pass
@@ -1050,13 +1050,13 @@ class KleinanzeigenBot(WebScrapingMixin):
ad_extractor = extract.AdExtractor(self.browser, self.config)
# use relevant download routine
if self.ads_selector in {'all', 'new'}: # explore ads overview for these two modes
LOG.info('Scanning your ad overview...')
if self.ads_selector in {"all", "new"}: # explore ads overview for these two modes
LOG.info("Scanning your ad overview...")
own_ad_urls = await ad_extractor.extract_own_ads_urls()
LOG.info('%s found.', pluralize("ad", len(own_ad_urls)))
LOG.info("%s found.", pluralize("ad", len(own_ad_urls)))
if self.ads_selector == 'all': # download all of your adds
LOG.info('Starting download of all ads...')
if self.ads_selector == "all": # download all of your adds
LOG.info("Starting download of all ads...")
success_count = 0
# call download function for each ad page
@@ -1067,12 +1067,12 @@ class KleinanzeigenBot(WebScrapingMixin):
success_count += 1
LOG.info("%d of %d ads were downloaded from your profile.", success_count, len(own_ad_urls))
elif self.ads_selector == 'new': # download only unsaved ads
elif self.ads_selector == "new": # download only unsaved ads
# check which ads already saved
saved_ad_ids = []
ads = self.load_ads(ignore_inactive = False, check_id = False) # do not skip because of existing IDs
for ad in ads:
ad_id = int(ad[2]['id'])
ad_id = int(ad[2]["id"])
saved_ad_ids.append(ad_id)
# determine ad IDs from links
@@ -1083,28 +1083,28 @@ class KleinanzeigenBot(WebScrapingMixin):
for ad_url, ad_id in ad_id_by_url.items():
# check if ad with ID already saved
if ad_id in saved_ad_ids:
LOG.info('The ad with id %d has already been saved.', ad_id)
LOG.info("The ad with id %d has already been saved.", ad_id)
continue
if await ad_extractor.naviagte_to_ad_page(ad_url):
await ad_extractor.download_ad(ad_id)
new_count += 1
LOG.info('%s were downloaded from your profile.', pluralize("new ad", new_count))
LOG.info("%s were downloaded from your profile.", pluralize("new ad", new_count))
elif re.compile(r'\d+[,\d+]*').search(self.ads_selector): # download ad(s) with specific id(s)
ids = [int(n) for n in self.ads_selector.split(',')]
LOG.info('Starting download of ad(s) with the id(s):')
LOG.info(' | '.join([str(ad_id) for ad_id in ids]))
elif re.compile(r"\d+[,\d+]*").search(self.ads_selector): # download ad(s) with specific id(s)
ids = [int(n) for n in self.ads_selector.split(",")]
LOG.info("Starting download of ad(s) with the id(s):")
LOG.info(" | ".join([str(ad_id) for ad_id in ids]))
for ad_id in ids: # call download routine for every id
exists = await ad_extractor.naviagte_to_ad_page(ad_id)
if exists:
await ad_extractor.download_ad(ad_id)
LOG.info('Downloaded ad with id %d', ad_id)
LOG.info("Downloaded ad with id %d", ad_id)
else:
LOG.error('The page with the id %d does not exist!', ad_id)
LOG.error("The page with the id %d does not exist!", ad_id)
def __get_description_with_affixes(self, ad_cfg: dict[str, Any]) -> str:
def __get_description_with_affixes(self, ad_cfg:dict[str, Any]) -> str:
"""Get the complete description with prefix and suffix applied.
Precedence (highest to lowest):

View File

@@ -9,7 +9,7 @@ from .utils import dicts
MAX_DESCRIPTION_LENGTH:Final[int] = 4000
def calculate_content_hash(ad_cfg: dict[str, Any]) -> str:
def calculate_content_hash(ad_cfg:dict[str, Any]) -> str:
"""Calculate a hash for user-modifiable fields of the ad."""
# Relevant fields for the hash
@@ -40,7 +40,7 @@ def calculate_content_hash(ad_cfg: dict[str, Any]) -> str:
return hashlib.sha256(content_str.encode()).hexdigest()
def get_description_affixes(config: dict[str, Any], *, prefix: bool = True) -> str:
def get_description_affixes(config:dict[str, Any], *, prefix:bool = True) -> str:
"""Get prefix or suffix for description with proper precedence.
This function handles both the new flattened format and legacy nested format:

View File

@@ -36,22 +36,22 @@ class AdExtractor(WebScrapingMixin):
"""
# create sub-directory for ad(s) to download (if necessary):
relative_directory = 'downloaded-ads'
relative_directory = "downloaded-ads"
# make sure configured base directory exists
if not os.path.exists(relative_directory) or not os.path.isdir(relative_directory):
os.mkdir(relative_directory)
LOG.info('Created ads directory at ./%s.', relative_directory)
LOG.info("Created ads directory at ./%s.", relative_directory)
new_base_dir = os.path.join(relative_directory, f'ad_{ad_id}')
if os.path.exists(new_base_dir):
LOG.info('Deleting current folder of ad %s...', ad_id)
LOG.info("Deleting current folder of ad %s...", ad_id)
shutil.rmtree(new_base_dir)
os.mkdir(new_base_dir)
LOG.info('New directory for ad created at %s.', new_base_dir)
LOG.info("New directory for ad created at %s.", new_base_dir)
# call extraction function
info = await self._extract_ad_page_info(new_base_dir, ad_id)
ad_file_path = new_base_dir + '/' + f'ad_{ad_id}.yaml'
ad_file_path = new_base_dir + "/" + f'ad_{ad_id}.yaml'
dicts.save_dict(ad_file_path, info)
async def _download_images_from_ad_page(self, directory:str, ad_id:int) -> list[str]:
@@ -67,18 +67,18 @@ class AdExtractor(WebScrapingMixin):
img_paths = []
try:
# download all images from box
image_box = await self.web_find(By.CLASS_NAME, 'galleryimage-large')
image_box = await self.web_find(By.CLASS_NAME, "galleryimage-large")
n_images = len(await self.web_find_all(By.CSS_SELECTOR, '.galleryimage-element[data-ix]', parent = image_box))
LOG.info('Found %s.', i18n.pluralize("image", n_images))
n_images = len(await self.web_find_all(By.CSS_SELECTOR, ".galleryimage-element[data-ix]", parent = image_box))
LOG.info("Found %s.", i18n.pluralize("image", n_images))
img_element:Element = await self.web_find(By.CSS_SELECTOR, 'div:nth-child(1) > img', parent = image_box)
img_fn_prefix = 'ad_' + str(ad_id) + '__img'
img_element:Element = await self.web_find(By.CSS_SELECTOR, "div:nth-child(1) > img", parent = image_box)
img_fn_prefix = "ad_" + str(ad_id) + "__img"
img_nr = 1
dl_counter = 0
while img_nr <= n_images: # scrolling + downloading
current_img_url = img_element.attrs['src'] # URL of the image
current_img_url = img_element.attrs["src"] # URL of the image
if current_img_url is None:
continue
@@ -86,43 +86,43 @@ class AdExtractor(WebScrapingMixin):
content_type = response.info().get_content_type()
file_ending = mimetypes.guess_extension(content_type)
img_path = f"{directory}/{img_fn_prefix}{img_nr}{file_ending}"
with open(img_path, 'wb') as f:
with open(img_path, "wb") as f:
shutil.copyfileobj(response, f)
dl_counter += 1
img_paths.append(img_path.rsplit('/', maxsplit = 1)[-1])
img_paths.append(img_path.rsplit("/", maxsplit = 1)[-1])
# navigate to next image (if exists)
if img_nr < n_images:
try:
# click next button, wait, and re-establish reference
await (await self.web_find(By.CLASS_NAME, 'galleryimage--navigation--next')).click()
await (await self.web_find(By.CLASS_NAME, "galleryimage--navigation--next")).click()
new_div = await self.web_find(By.CSS_SELECTOR, f'div.galleryimage-element:nth-child({img_nr + 1})')
img_element = await self.web_find(By.TAG_NAME, 'img', parent = new_div)
img_element = await self.web_find(By.TAG_NAME, "img", parent = new_div)
except TimeoutError:
LOG.error('NEXT button in image gallery somehow missing, aborting image fetching.')
LOG.error("NEXT button in image gallery somehow missing, aborting image fetching.")
break
img_nr += 1
LOG.info('Downloaded %s.', i18n.pluralize("image", dl_counter))
LOG.info("Downloaded %s.", i18n.pluralize("image", dl_counter))
except TimeoutError: # some ads do not require images
LOG.warning('No image area found. Continuing without downloading images.')
LOG.warning("No image area found. Continuing without downloading images.")
return img_paths
def extract_ad_id_from_ad_url(self, url: str) -> int:
def extract_ad_id_from_ad_url(self, url:str) -> int:
"""
Extracts the ID of an ad, given by its reference link.
:param url: the URL to the ad page
:return: the ad ID, a (ten-digit) integer number
"""
num_part = url.split('/')[-1] # suffix
id_part = num_part.split('-')[0]
num_part = url.split("/")[-1] # suffix
id_part = num_part.split("-")[0]
try:
path = url.split('?', 1)[0] # Remove query string if present
last_segment = path.rstrip('/').split('/')[-1] # Get last path component
id_part = last_segment.split('-')[0] # Extract part before first hyphen
path = url.split("?", 1)[0] # Remove query string if present
last_segment = path.rstrip("/").split("/")[-1] # Get last path component
id_part = last_segment.split("-")[0] # Extract part before first hyphen
return int(id_part)
except (IndexError, ValueError) as ex:
LOG.warning("Failed to extract ad ID from URL '%s': %s", url, ex)
@@ -135,41 +135,41 @@ class AdExtractor(WebScrapingMixin):
:return: the links to your ad pages
"""
# navigate to "your ads" page
await self.web_open('https://www.kleinanzeigen.de/m-meine-anzeigen.html')
await self.web_open("https://www.kleinanzeigen.de/m-meine-anzeigen.html")
await self.web_sleep(2000, 3000) # Consider replacing with explicit waits later
# Try to find the main ad list container first
try:
ad_list_container = await self.web_find(By.ID, 'my-manageitems-adlist')
ad_list_container = await self.web_find(By.ID, "my-manageitems-adlist")
except TimeoutError:
LOG.warning('Ad list container #my-manageitems-adlist not found. Maybe no ads present?')
LOG.warning("Ad list container #my-manageitems-adlist not found. Maybe no ads present?")
return []
# --- Pagination handling ---
multi_page = False
try:
# Correct selector: Use uppercase '.Pagination'
pagination_section = await self.web_find(By.CSS_SELECTOR, '.Pagination', timeout=10) # Increased timeout slightly
pagination_section = await self.web_find(By.CSS_SELECTOR, ".Pagination", timeout = 10) # Increased timeout slightly
# Correct selector: Use 'aria-label'
# Also check if the button is actually present AND potentially enabled (though enabled check isn't strictly necessary here, only for clicking later)
next_buttons = await self.web_find_all(By.CSS_SELECTOR, 'button[aria-label="Nächste"]', parent=pagination_section)
next_buttons = await self.web_find_all(By.CSS_SELECTOR, 'button[aria-label="Nächste"]', parent = pagination_section)
if next_buttons:
# Check if at least one 'Nächste' button is not disabled (optional but good practice)
enabled_next_buttons = [btn for btn in next_buttons if not btn.attrs.get('disabled')]
enabled_next_buttons = [btn for btn in next_buttons if not btn.attrs.get("disabled")]
if enabled_next_buttons:
multi_page = True
LOG.info('Multiple ad pages detected.')
LOG.info("Multiple ad pages detected.")
else:
LOG.info('Next button found but is disabled. Assuming single effective page.')
LOG.info("Next button found but is disabled. Assuming single effective page.")
else:
LOG.info('No "Naechste" button found within pagination. Assuming single page.')
except TimeoutError:
# This will now correctly trigger only if the '.Pagination' div itself is not found
LOG.info('No pagination controls found. Assuming single page.')
LOG.info("No pagination controls found. Assuming single page.")
except Exception as e:
LOG.exception("Error during pagination detection: %s", e)
LOG.info('Assuming single page due to error during pagination check.')
LOG.info("Assuming single page due to error during pagination check.")
# --- End Pagination Handling ---
refs:list[str] = []
@@ -182,8 +182,8 @@ class AdExtractor(WebScrapingMixin):
# Re-find the ad list container on the current page/state
try:
ad_list_container = await self.web_find(By.ID, 'my-manageitems-adlist')
list_items = await self.web_find_all(By.CLASS_NAME, 'cardbox', parent=ad_list_container)
ad_list_container = await self.web_find(By.ID, "my-manageitems-adlist")
list_items = await self.web_find_all(By.CLASS_NAME, "cardbox", parent = ad_list_container)
LOG.info("Found %s ad items on page %s.", len(list_items), current_page)
except TimeoutError:
LOG.warning("Could not find ad list container or items on page %s.", current_page)
@@ -192,7 +192,7 @@ class AdExtractor(WebScrapingMixin):
# Extract references using the CORRECTED selector
try:
page_refs = [
(await self.web_find(By.CSS_SELECTOR, 'div.manageitems-item-ad h3 a.text-onSurface', parent=li)).attrs['href']
(await self.web_find(By.CSS_SELECTOR, "div.manageitems-item-ad h3 a.text-onSurface", parent = li)).attrs["href"]
for li in list_items
]
refs.extend(page_refs)
@@ -207,12 +207,12 @@ class AdExtractor(WebScrapingMixin):
# --- Navigate to next page ---
try:
# Find the pagination section again (scope might have changed after scroll/wait)
pagination_section = await self.web_find(By.CSS_SELECTOR, '.Pagination', timeout=5)
pagination_section = await self.web_find(By.CSS_SELECTOR, ".Pagination", timeout = 5)
# Find the "Next" button using the correct aria-label selector and ensure it's not disabled
next_button_element = None
possible_next_buttons = await self.web_find_all(By.CSS_SELECTOR, 'button[aria-label="Nächste"]', parent=pagination_section)
possible_next_buttons = await self.web_find_all(By.CSS_SELECTOR, 'button[aria-label="Nächste"]', parent = pagination_section)
for btn in possible_next_buttons:
if not btn.attrs.get('disabled'): # Check if the button is enabled
if not btn.attrs.get("disabled"): # Check if the button is enabled
next_button_element = btn
break # Found an enabled next button
@@ -235,7 +235,7 @@ class AdExtractor(WebScrapingMixin):
# --- End Navigation ---
if not refs:
LOG.warning('No ad URLs were extracted.')
LOG.warning("No ad URLs were extracted.")
return refs
@@ -246,27 +246,27 @@ class AdExtractor(WebScrapingMixin):
"""
if reflect.is_integer(id_or_url):
# navigate to start page, otherwise page can be None!
await self.web_open('https://www.kleinanzeigen.de/')
await self.web_open("https://www.kleinanzeigen.de/")
# enter the ad ID into the search bar
await self.web_input(By.ID, "site-search-query", id_or_url)
# navigate to ad page and wait
await self.web_check(By.ID, 'site-search-submit', Is.CLICKABLE)
submit_button = await self.web_find(By.ID, 'site-search-submit')
await self.web_check(By.ID, "site-search-submit", Is.CLICKABLE)
submit_button = await self.web_find(By.ID, "site-search-submit")
await submit_button.click()
else:
await self.web_open(str(id_or_url)) # navigate to URL directly given
await self.web_sleep()
# handle the case that invalid ad ID given
if self.page.url.endswith('k0'):
LOG.error('There is no ad under the given ID.')
if self.page.url.endswith("k0"):
LOG.error("There is no ad under the given ID.")
return False
# close (warning) popup, if given
try:
await self.web_find(By.ID, 'vap-ovrly-secure')
LOG.warning('A popup appeared!')
await self.web_click(By.CLASS_NAME, 'mfp-close')
await self.web_find(By.ID, "vap-ovrly-secure")
LOG.warning("A popup appeared!")
await self.web_click(By.CLASS_NAME, "mfp-close")
await self.web_sleep()
except TimeoutError:
pass
@@ -280,22 +280,22 @@ class AdExtractor(WebScrapingMixin):
:param ad_id: the ad ID, already extracted by a calling function
:return: a dictionary with the keys as given in an ad YAML, and their respective values
"""
info:dict[str, Any] = {'active': True}
info:dict[str, Any] = {"active": True}
# extract basic info
info['type'] = 'OFFER' if 's-anzeige' in self.page.url else 'WANTED'
title:str = await self.web_text(By.ID, 'viewad-title')
info["type"] = "OFFER" if "s-anzeige" in self.page.url else "WANTED"
title:str = await self.web_text(By.ID, "viewad-title")
LOG.info('Extracting information from ad with title "%s"', title)
info['category'] = await self._extract_category_from_ad_page()
info['title'] = title
info["category"] = await self._extract_category_from_ad_page()
info["title"] = title
# Get raw description text
raw_description = (await self.web_text(By.ID, 'viewad-description-text')).strip()
raw_description = (await self.web_text(By.ID, "viewad-description-text")).strip()
# Get prefix and suffix from config
prefix = get_description_affixes(self.config, prefix=True)
suffix = get_description_affixes(self.config, prefix=False)
prefix = get_description_affixes(self.config, prefix = True)
suffix = get_description_affixes(self.config, prefix = False)
# Remove prefix and suffix if present
description_text = raw_description
@@ -304,38 +304,38 @@ class AdExtractor(WebScrapingMixin):
if suffix and description_text.endswith(suffix.strip()):
description_text = description_text[:-len(suffix.strip())]
info['description'] = description_text.strip()
info["description"] = description_text.strip()
info['special_attributes'] = await self._extract_special_attributes_from_ad_page()
if "art_s" in info['special_attributes']:
info["special_attributes"] = await self._extract_special_attributes_from_ad_page()
if "art_s" in info["special_attributes"]:
# change e.g. category "161/172" to "161/172/lautsprecher_kopfhoerer"
info['category'] = f"{info['category']}/{info['special_attributes']['art_s']}"
del info['special_attributes']['art_s']
if "schaden_s" in info['special_attributes']:
info["category"] = f"{info['category']}/{info['special_attributes']['art_s']}"
del info["special_attributes"]["art_s"]
if "schaden_s" in info["special_attributes"]:
# change f to 'nein' and 't' to 'ja'
info['special_attributes']['schaden_s'] = info['special_attributes']['schaden_s'].translate(str.maketrans({'t': 'ja', 'f': 'nein'}))
info['price'], info['price_type'] = await self._extract_pricing_info_from_ad_page()
info['shipping_type'], info['shipping_costs'], info['shipping_options'] = await self._extract_shipping_info_from_ad_page()
info['sell_directly'] = await self._extract_sell_directly_from_ad_page()
info['images'] = await self._download_images_from_ad_page(directory, ad_id)
info['contact'] = await self._extract_contact_from_ad_page()
info['id'] = ad_id
info["special_attributes"]["schaden_s"] = info["special_attributes"]["schaden_s"].translate(str.maketrans({"t": "ja", "f": "nein"}))
info["price"], info["price_type"] = await self._extract_pricing_info_from_ad_page()
info["shipping_type"], info["shipping_costs"], info["shipping_options"] = await self._extract_shipping_info_from_ad_page()
info["sell_directly"] = await self._extract_sell_directly_from_ad_page()
info["images"] = await self._download_images_from_ad_page(directory, ad_id)
info["contact"] = await self._extract_contact_from_ad_page()
info["id"] = ad_id
try: # try different locations known for creation date element
creation_date = await self.web_text(By.XPATH,
'/html/body/div[1]/div[2]/div/section[2]/section/section/article/div[3]/div[2]/div[2]/div[1]/span')
"/html/body/div[1]/div[2]/div/section[2]/section/section/article/div[3]/div[2]/div[2]/div[1]/span")
except TimeoutError:
creation_date = await self.web_text(By.CSS_SELECTOR, '#viewad-extra-info > div:nth-child(1) > span:nth-child(2)')
creation_date = await self.web_text(By.CSS_SELECTOR, "#viewad-extra-info > div:nth-child(1) > span:nth-child(2)")
# convert creation date to ISO format
created_parts = creation_date.split('.')
creation_date = created_parts[2] + '-' + created_parts[1] + '-' + created_parts[0] + ' 00:00:00'
created_parts = creation_date.split(".")
creation_date = created_parts[2] + "-" + created_parts[1] + "-" + created_parts[0] + " 00:00:00"
creation_date = datetime.fromisoformat(creation_date).isoformat()
info['created_on'] = creation_date
info['updated_on'] = None # will be set later on
info["created_on"] = creation_date
info["updated_on"] = None # will be set later on
# Calculate the initial hash for the downloaded ad
info['content_hash'] = calculate_content_hash(info)
info["content_hash"] = calculate_content_hash(info)
return info
@@ -346,12 +346,12 @@ class AdExtractor(WebScrapingMixin):
:return: a category string of form abc/def, where a-f are digits
"""
category_line = await self.web_find(By.ID, 'vap-brdcrmb')
category_first_part = await self.web_find(By.CSS_SELECTOR, 'a:nth-of-type(2)', parent = category_line)
category_second_part = await self.web_find(By.CSS_SELECTOR, 'a:nth-of-type(3)', parent = category_line)
cat_num_first = category_first_part.attrs['href'].split('/')[-1][1:]
cat_num_second = category_second_part.attrs['href'].split('/')[-1][1:]
category:str = cat_num_first + '/' + cat_num_second
category_line = await self.web_find(By.ID, "vap-brdcrmb")
category_first_part = await self.web_find(By.CSS_SELECTOR, "a:nth-of-type(2)", parent = category_line)
category_second_part = await self.web_find(By.CSS_SELECTOR, "a:nth-of-type(3)", parent = category_line)
cat_num_first = category_first_part.attrs["href"].split("/")[-1][1:]
cat_num_second = category_second_part.attrs["href"].split("/")[-1][1:]
category:str = cat_num_first + "/" + cat_num_second
return category
@@ -368,7 +368,7 @@ class AdExtractor(WebScrapingMixin):
special_attributes_str = belen_conf["universalAnalyticsOpts"]["dimensions"]["dimension108"]
special_attributes = dict(item.split(":") for item in special_attributes_str.split("|") if ":" in item)
special_attributes = {k: v for k, v in special_attributes.items() if not k.endswith('.versand_s') and k != "versand_s"}
special_attributes = {k: v for k, v in special_attributes.items() if not k.endswith(".versand_s") and k != "versand_s"}
return special_attributes
async def _extract_pricing_info_from_ad_page(self) -> tuple[float | None, str]:
@@ -378,24 +378,24 @@ class AdExtractor(WebScrapingMixin):
:return: the price of the offer (optional); and the pricing type
"""
try:
price_str:str = await self.web_text(By.ID, 'viewad-price')
price_str:str = await self.web_text(By.ID, "viewad-price")
price:int | None = None
match price_str.split()[-1]:
case '':
price_type = 'FIXED'
case "":
price_type = "FIXED"
# replace('.', '') is to remove the thousands separator before parsing as int
price = int(price_str.replace('.', '').split()[0])
case 'VB':
price_type = 'NEGOTIABLE'
price = int(price_str.replace(".", "").split()[0])
case "VB":
price_type = "NEGOTIABLE"
if price_str != "VB": # can be either 'X € VB', or just 'VB'
price = int(price_str.replace('.', '').split()[0])
case 'verschenken':
price_type = 'GIVE_AWAY'
price = int(price_str.replace(".", "").split()[0])
case "verschenken":
price_type = "GIVE_AWAY"
case _:
price_type = 'NOT_APPLICABLE'
price_type = "NOT_APPLICABLE"
return price, price_type
except TimeoutError: # no 'commercial' ad, has no pricing box etc.
return None, 'NOT_APPLICABLE'
return None, "NOT_APPLICABLE"
async def _extract_shipping_info_from_ad_page(self) -> tuple[str, float | None, list[str] | None]:
"""
@@ -403,17 +403,17 @@ class AdExtractor(WebScrapingMixin):
:return: the shipping type, and the shipping price (optional)
"""
ship_type, ship_costs, shipping_options = 'NOT_APPLICABLE', None, None
ship_type, ship_costs, shipping_options = "NOT_APPLICABLE", None, None
try:
shipping_text = await self.web_text(By.CLASS_NAME, 'boxedarticle--details--shipping')
shipping_text = await self.web_text(By.CLASS_NAME, "boxedarticle--details--shipping")
# e.g. '+ Versand ab 5,49 €' OR 'Nur Abholung'
if shipping_text == 'Nur Abholung':
ship_type = 'PICKUP'
elif shipping_text == 'Versand möglich':
ship_type = 'SHIPPING'
elif '' in shipping_text:
shipping_price_parts = shipping_text.split(' ')
ship_type = 'SHIPPING'
if shipping_text == "Nur Abholung":
ship_type = "PICKUP"
elif shipping_text == "Versand möglich":
ship_type = "SHIPPING"
elif "" in shipping_text:
shipping_price_parts = shipping_text.split(" ")
ship_type = "SHIPPING"
ship_costs = float(misc.parse_decimal(shipping_price_parts[-2]))
# reading shipping option from kleinanzeigen
@@ -425,7 +425,7 @@ class AdExtractor(WebScrapingMixin):
internal_shipping_opt = [x for x in shipping_costs if x["priceInEuroCent"] == ship_costs * 100]
if not internal_shipping_opt:
return 'NOT_APPLICABLE', ship_costs, shipping_options
return "NOT_APPLICABLE", ship_costs, shipping_options
# map to internal shipping identifiers used by kleinanzeigen-bot
shipping_option_mapping = {
@@ -440,13 +440,13 @@ class AdExtractor(WebScrapingMixin):
"HERMES_004": "Hermes_L"
}
shipping_option = shipping_option_mapping.get(internal_shipping_opt[0]['id'])
shipping_option = shipping_option_mapping.get(internal_shipping_opt[0]["id"])
if not shipping_option:
return 'NOT_APPLICABLE', ship_costs, shipping_options
return "NOT_APPLICABLE", ship_costs, shipping_options
shipping_options = [shipping_option]
except TimeoutError: # no pricing box -> no shipping given
ship_type = 'NOT_APPLICABLE'
ship_type = "NOT_APPLICABLE"
return ship_type, ship_costs, shipping_options
@@ -457,7 +457,7 @@ class AdExtractor(WebScrapingMixin):
:return: a boolean indicating whether the sell directly option is active (optional)
"""
try:
buy_now_is_active:bool = 'Direkt kaufen' in (await self.web_text(By.ID, 'payment-buttons-sidebar'))
buy_now_is_active:bool = "Direkt kaufen" in (await self.web_text(By.ID, "payment-buttons-sidebar"))
return buy_now_is_active
except TimeoutError:
return None
@@ -469,34 +469,34 @@ class AdExtractor(WebScrapingMixin):
:return: a dictionary containing the address parts with their corresponding values
"""
contact:dict[str, (str | None)] = {}
address_text = await self.web_text(By.ID, 'viewad-locality')
address_text = await self.web_text(By.ID, "viewad-locality")
# format: e.g. (Beispiel Allee 42,) 12345 Bundesland - Stadt
try:
street = (await self.web_text(By.ID, 'street-address'))[:-1] # trailing comma
contact['street'] = street
street = (await self.web_text(By.ID, "street-address"))[:-1] # trailing comma
contact["street"] = street
except TimeoutError:
LOG.info('No street given in the contact.')
LOG.info("No street given in the contact.")
(zipcode, location) = address_text.split(" ", 1)
contact['zipcode'] = zipcode # e.g. 19372
contact['location'] = location # e.g. Mecklenburg-Vorpommern - Steinbeck
contact["zipcode"] = zipcode # e.g. 19372
contact["location"] = location # e.g. Mecklenburg-Vorpommern - Steinbeck
contact_person_element:Element = await self.web_find(By.ID, 'viewad-contact')
name_element = await self.web_find(By.CLASS_NAME, 'iconlist-text', parent = contact_person_element)
contact_person_element:Element = await self.web_find(By.ID, "viewad-contact")
name_element = await self.web_find(By.CLASS_NAME, "iconlist-text", parent = contact_person_element)
try:
name = await self.web_text(By.TAG_NAME, 'a', parent = name_element)
name = await self.web_text(By.TAG_NAME, "a", parent = name_element)
except TimeoutError: # edge case: name without link
name = await self.web_text(By.TAG_NAME, 'span', parent = name_element)
contact['name'] = name
name = await self.web_text(By.TAG_NAME, "span", parent = name_element)
contact["name"] = name
if 'street' not in contact:
contact['street'] = None
if "street" not in contact:
contact["street"] = None
try: # phone number is unusual for non-professional sellers today
phone_element = await self.web_find(By.ID, 'viewad-contact-phone')
phone_number = await self.web_text(By.TAG_NAME, 'a', parent = phone_element)
contact['phone'] = ''.join(phone_number.replace('-', ' ').split(' ')).replace('+49(0)', '0')
phone_element = await self.web_find(By.ID, "viewad-contact-phone")
phone_number = await self.web_text(By.TAG_NAME, "a", parent = phone_element)
contact["phone"] = "".join(phone_number.replace("-", " ").split(" ")).replace("+49(0)", "0")
except TimeoutError:
contact['phone'] = None # phone seems to be a deprecated feature (for non-professional users)
contact["phone"] = None # phone seems to be a deprecated feature (for non-professional users)
# also see 'https://themen.kleinanzeigen.de/hilfe/deine-anzeigen/Telefon/
return contact

View File

@@ -96,7 +96,7 @@ def save_dict(filepath:str, content:dict[str, Any]) -> None:
yaml.indent(mapping = 2, sequence = 4, offset = 2)
yaml.representer.add_representer(str, # use YAML | block style for multi-line strings
lambda dumper, data:
dumper.represent_scalar('tag:yaml.org,2002:str', data, style = '|' if '\n' in data else None)
dumper.represent_scalar("tag:yaml.org,2002:str", data, style = "|" if "\n" in data else None)
)
yaml.allow_duplicate_keys = False
yaml.explicit_start = False

View File

@@ -3,14 +3,14 @@
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import sys, traceback # isort: skip
from types import FrameType, TracebackType
from typing import Any, Final
from typing import Final
from . import loggers
LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
def on_exception(ex_type: type[BaseException] | None, ex_value: BaseException | None, ex_traceback: TracebackType | None) -> None:
def on_exception(ex_type:type[BaseException] | None, ex_value:BaseException | None, ex_traceback:TracebackType | None) -> None:
if ex_type is None or ex_value is None:
LOG.error("Unknown exception occurred (missing exception info): ex_type=%s, ex_value=%s", ex_type, ex_value)
return

View File

@@ -11,6 +11,6 @@ class KleinanzeigenBotError(RuntimeError):
class CaptchaEncountered(KleinanzeigenBotError):
"""Raised when a Captcha was detected and auto-restart is enabled."""
def __init__(self, restart_delay: timedelta) -> None:
def __init__(self, restart_delay:timedelta) -> None:
super().__init__()
self.restart_delay = restart_delay

View File

@@ -42,7 +42,7 @@ class Locale(NamedTuple):
return f"{self.language}{region_part}{encoding_part}"
@staticmethod
def of(locale_string: str) -> 'Locale':
def of(locale_string:str) -> "Locale":
"""
>>> Locale.of("en_US.UTF-8")
Locale(language='en', region='US', encoding='UTF-8')
@@ -86,11 +86,11 @@ def _detect_locale() -> Locale:
return Locale.of(lang) if lang else Locale("en", "US", "UTF-8")
_CURRENT_LOCALE: Locale = _detect_locale()
_TRANSLATIONS: dict[str, Any] | None = None
_CURRENT_LOCALE:Locale = _detect_locale()
_TRANSLATIONS:dict[str, Any] | None = None
def translate(text:object, caller: inspect.FrameInfo | None) -> str:
def translate(text:object, caller:inspect.FrameInfo | None) -> str:
text = str(text)
if not caller:
return text
@@ -105,7 +105,7 @@ def translate(text:object, caller: inspect.FrameInfo | None) -> str:
if not _TRANSLATIONS:
return text
module_name = caller.frame.f_globals.get('__name__') # pylint: disable=redefined-outer-name
module_name = caller.frame.f_globals.get("__name__") # pylint: disable=redefined-outer-name
file_basename = os.path.splitext(os.path.basename(caller.filename))[0]
if module_name and module_name.endswith(f".{file_basename}"):
module_name = module_name[:-(len(file_basename) + 1)]
@@ -124,9 +124,9 @@ gettext.gettext = lambda message: translate(_original_gettext(message), reflect.
for module_name, module in sys.modules.items():
if module is None or module_name in sys.builtin_module_names:
continue
if hasattr(module, '_') and module._ is _original_gettext:
if hasattr(module, "_") and module._ is _original_gettext:
module._ = gettext.gettext # type: ignore[attr-defined]
if hasattr(module, 'gettext') and module.gettext is _original_gettext:
if hasattr(module, "gettext") and module.gettext is _original_gettext:
module.gettext = gettext.gettext # type: ignore[attr-defined]
@@ -190,8 +190,8 @@ def pluralize(noun:str, count:int | Sized, *, prefix_with_count:bool = True) ->
# English
if len(noun) < 2: # noqa: PLR2004 Magic value used in comparison
return f"{prefix}{noun}s"
if noun.endswith(('s', 'sh', 'ch', 'x', 'z')):
if noun.endswith(("s", "sh", "ch", "x", "z")):
return f"{prefix}{noun}es"
if noun.endswith('y') and noun[-2].lower() not in "aeiou":
if noun.endswith("y") and noun[-2].lower() not in "aeiou":
return f"{prefix}{noun[:-1]}ies"
return f"{prefix}{noun}s"

View File

@@ -28,11 +28,11 @@ LOG_ROOT:Final[logging.Logger] = logging.getLogger()
class _MaxLevelFilter(logging.Filter):
def __init__(self, level: int) -> None:
def __init__(self, level:int) -> None:
super().__init__()
self.level = level
def filter(self, record: logging.LogRecord) -> bool:
def filter(self, record:logging.LogRecord) -> bool:
return record.levelno <= self.level
@@ -104,7 +104,7 @@ def configure_console_logging() -> None:
class LogFileHandle:
"""Encapsulates a log file handler with close and status methods."""
def __init__(self, file_path: str, handler: RotatingFileHandler, logger: logging.Logger) -> None:
def __init__(self, file_path:str, handler:RotatingFileHandler, logger:logging.Logger) -> None:
self.file_path = file_path
self._handler:RotatingFileHandler | None = handler
self._logger = logger
@@ -146,14 +146,14 @@ def flush_all_handlers() -> None:
handler.flush()
def get_logger(name: str | None = None) -> logging.Logger:
def get_logger(name:str | None = None) -> logging.Logger:
"""
Returns a localized logger
"""
class TranslatingLogger(logging.Logger):
def _log(self, level: int, msg: object, *args: Any, **kwargs: Any) -> None:
def _log(self, level:int, msg:object, *args:Any, **kwargs:Any) -> None:
if level != DEBUG: # debug messages should not be translated
msg = i18n.translate(msg, reflect.get_caller(2))
super()._log(level, msg, *args, **kwargs)

View File

@@ -10,7 +10,7 @@ from typing import Any, TypeVar
from . import i18n
# https://mypy.readthedocs.io/en/stable/generics.html#generic-functions
T = TypeVar('T')
T = TypeVar("T")
def ensure(condition:Any | bool | Callable[[], bool], error_message:str, timeout:float = 5, poll_requency:float = 0.5) -> None:
@@ -49,7 +49,7 @@ def is_frozen() -> bool:
return getattr(sys, "frozen", False)
async def ainput(prompt: str) -> str:
async def ainput(prompt:str) -> str:
return await asyncio.to_thread(input, f'{prompt} ')
@@ -84,10 +84,10 @@ def parse_decimal(number:float | int | str) -> decimal.Decimal:
def parse_datetime(
date: datetime | str | None,
date:datetime | str | None,
*,
add_timezone_if_missing: bool = True,
use_local_timezone: bool = True
add_timezone_if_missing:bool = True,
use_local_timezone:bool = True
) -> datetime | None:
"""
Parses a datetime object or ISO-formatted string.
@@ -152,22 +152,22 @@ def parse_duration(text:str) -> timedelta:
>>> parse_duration("invalid input")
datetime.timedelta(0)
"""
pattern = re.compile(r'(\d+)\s*([dhms])')
pattern = re.compile(r"(\d+)\s*([dhms])")
parts = pattern.findall(text.lower())
kwargs: dict[str, int] = {}
kwargs:dict[str, int] = {}
for value, unit in parts:
if unit == 'd':
kwargs['days'] = kwargs.get('days', 0) + int(value)
elif unit == 'h':
kwargs['hours'] = kwargs.get('hours', 0) + int(value)
elif unit == 'm':
kwargs['minutes'] = kwargs.get('minutes', 0) + int(value)
elif unit == 's':
kwargs['seconds'] = kwargs.get('seconds', 0) + int(value)
if unit == "d":
kwargs["days"] = kwargs.get("days", 0) + int(value)
elif unit == "h":
kwargs["hours"] = kwargs.get("hours", 0) + int(value)
elif unit == "m":
kwargs["minutes"] = kwargs.get("minutes", 0) + int(value)
elif unit == "s":
kwargs["seconds"] = kwargs.get("seconds", 0) + int(value)
return timedelta(**kwargs)
def format_timedelta(td: timedelta) -> str:
def format_timedelta(td:timedelta) -> str:
"""
Formats a timedelta into a human-readable string using the pluralize utility.

View File

@@ -5,7 +5,7 @@ import inspect
from typing import Any
def get_caller(depth: int = 1) -> inspect.FrameInfo | None:
def get_caller(depth:int = 1) -> inspect.FrameInfo | None:
stack = inspect.stack()
try:
for frame in stack[depth + 1:]:

View File

@@ -165,7 +165,7 @@ class WebScrapingMixin:
prefs_file = os.path.join(profile_dir, "Preferences")
if not os.path.exists(prefs_file):
LOG.info(" -> Setting chrome prefs [%s]...", prefs_file)
with open(prefs_file, "w", encoding = 'UTF-8') as fd:
with open(prefs_file, "w", encoding = "UTF-8") as fd:
json.dump({
"credentials_enable_service": False,
"enable_do_not_track": True,
@@ -234,16 +234,16 @@ class WebScrapingMixin:
case "Windows":
browser_paths = [
os.environ.get("PROGRAMFILES", "C:\\Program Files") + r'\Microsoft\Edge\Application\msedge.exe',
os.environ.get("PROGRAMFILES(X86)", "C:\\Program Files (x86)") + r'\Microsoft\Edge\Application\msedge.exe',
os.environ.get("PROGRAMFILES", "C:\\Program Files") + r"\Microsoft\Edge\Application\msedge.exe",
os.environ.get("PROGRAMFILES(X86)", "C:\\Program Files (x86)") + r"\Microsoft\Edge\Application\msedge.exe",
os.environ["PROGRAMFILES"] + r'\Chromium\Application\chrome.exe',
os.environ["PROGRAMFILES(X86)"] + r'\Chromium\Application\chrome.exe',
os.environ["LOCALAPPDATA"] + r'\Chromium\Application\chrome.exe',
os.environ["PROGRAMFILES"] + r"\Chromium\Application\chrome.exe",
os.environ["PROGRAMFILES(X86)"] + r"\Chromium\Application\chrome.exe",
os.environ["LOCALAPPDATA"] + r"\Chromium\Application\chrome.exe",
os.environ["PROGRAMFILES"] + r'\Chrome\Application\chrome.exe',
os.environ["PROGRAMFILES(X86)"] + r'\Chrome\Application\chrome.exe',
os.environ["LOCALAPPDATA"] + r'\Chrome\Application\chrome.exe',
os.environ["PROGRAMFILES"] + r"\Chrome\Application\chrome.exe",
os.environ["PROGRAMFILES(X86)"] + r"\Chrome\Application\chrome.exe",
os.environ["LOCALAPPDATA"] + r"\Chrome\Application\chrome.exe",
shutil.which("msedge.exe"),
shutil.which("chromium.exe"),
@@ -259,8 +259,8 @@ class WebScrapingMixin:
raise AssertionError(_("Installed browser could not be detected"))
async def web_await(self, condition: Callable[[], T | Never | Coroutine[Any, Any, T | Never]], *,
timeout:int | float = 5, timeout_error_message: str = "") -> T:
async def web_await(self, condition:Callable[[], T | Never | Coroutine[Any, Any, T | Never]], *,
timeout:int | float = 5, timeout_error_message:str = "") -> T:
"""
Blocks/waits until the given condition is met.
@@ -523,7 +523,7 @@ class WebScrapingMixin:
return response
# pylint: enable=dangerous-default-value
async def web_scroll_page_down(self, scroll_length: int = 10, scroll_speed: int = 10_000, *, scroll_back_top: bool = False) -> None:
async def web_scroll_page_down(self, scroll_length:int = 10, scroll_speed:int = 10_000, *, scroll_back_top:bool = False) -> None:
"""
Smoothly scrolls the current web page down.
@@ -532,7 +532,7 @@ class WebScrapingMixin:
:param scroll_back_top: whether to scroll the page back to the top after scrolling to the bottom
"""
current_y_pos = 0
bottom_y_pos: int = await self.web_execute('document.body.scrollHeight') # get bottom position
bottom_y_pos:int = await self.web_execute("document.body.scrollHeight") # get bottom position
while current_y_pos < bottom_y_pos: # scroll in steps until bottom reached
current_y_pos += scroll_length
await self.web_execute(f'window.scrollTo(0, {current_y_pos})') # scroll one step