feat: add type safe Ad model

This commit is contained in:
sebthom
2025-05-15 00:10:45 +02:00
committed by Sebastian Thomschke
parent 1369da1c34
commit 6ede14596d
15 changed files with 817 additions and 459 deletions

View File

@@ -4,7 +4,6 @@
import atexit, copy, json, os, re, signal, sys, textwrap # isort: skip
import getopt # pylint: disable=deprecated-module
import urllib.parse as urllib_parse
from collections.abc import Iterable
from gettext import gettext as _
from typing import Any, Final
@@ -14,13 +13,14 @@ from wcmatch import glob
from . import extract, resources
from ._version import __version__
from .ads import MAX_DESCRIPTION_LENGTH, calculate_content_hash, get_description_affixes
from .ads import calculate_content_hash, get_description_affixes
from .model.ad_model import MAX_DESCRIPTION_LENGTH, Ad
from .model.config_model import Config
from .utils import dicts, error_handlers, loggers, misc
from .utils.exceptions import CaptchaEncountered
from .utils.files import abspath
from .utils.i18n import Locale, get_current_locale, pluralize, set_current_locale
from .utils.misc import ainput, ensure, is_frozen, parse_datetime, parse_decimal
from .utils.misc import ainput, ensure, is_frozen
from .utils.web_scraping_mixin import By, Element, Is, WebScrapingMixin
# W0406: possibly a bug, see https://github.com/PyCQA/pylint/issues/3933
@@ -266,17 +266,17 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info("App version: %s", self.get_version())
LOG.info("Python version: %s", sys.version)
def __check_ad_republication(self, ad_cfg:dict[str, Any], ad_file_relative:str) -> bool:
def __check_ad_republication(self, ad_cfg:Ad, ad_file_relative:str) -> bool:
"""
Check if an ad needs to be republished based on republication interval.
Returns True if the ad should be republished based on the interval.
Note: This method no longer checks for content changes. Use __check_ad_changed for that.
"""
if ad_cfg["updated_on"]:
last_updated_on = parse_datetime(ad_cfg["updated_on"])
elif ad_cfg["created_on"]:
last_updated_on = parse_datetime(ad_cfg["created_on"])
if ad_cfg.updated_on:
last_updated_on = ad_cfg.updated_on
elif ad_cfg.created_on:
last_updated_on = ad_cfg.created_on
else:
return True
@@ -285,23 +285,23 @@ class KleinanzeigenBot(WebScrapingMixin):
# Check republication interval
ad_age = misc.now() - last_updated_on
if ad_age.days <= ad_cfg["republication_interval"]:
if ad_age.days <= ad_cfg.republication_interval:
LOG.info(
" -> SKIPPED: ad [%s] was last published %d days ago. republication is only required every %s days",
ad_file_relative,
ad_age.days,
ad_cfg["republication_interval"]
ad_cfg.republication_interval
)
return False
return True
def __check_ad_changed(self, ad_cfg:dict[str, Any], ad_cfg_orig:dict[str, Any], ad_file_relative:str) -> bool:
def __check_ad_changed(self, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], ad_file_relative:str) -> bool:
"""
Check if an ad has been changed since last publication.
Returns True if the ad has been changed.
"""
if not ad_cfg["id"]:
if not ad_cfg.id:
# New ads are not considered "changed"
return False
@@ -321,7 +321,7 @@ class KleinanzeigenBot(WebScrapingMixin):
return False
def load_ads(self, *, ignore_inactive:bool = True, check_id:bool = True) -> list[tuple[str, dict[str, Any], dict[str, Any]]]:
def load_ads(self, *, ignore_inactive:bool = True, check_id:bool = True) -> list[tuple[str, Ad, dict[str, Any]]]:
LOG.info("Searching for ad config files...")
ad_files:dict[str, str] = {}
@@ -344,24 +344,17 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info("Start fetch task for the ad(s) with id(s):")
LOG.info(" | ".join([str(id_) for id_ in ids]))
ad_fields = dicts.load_dict_from_module(resources, "ad_fields.yaml")
ads = []
for ad_file, ad_file_relative in sorted(ad_files.items()):
ad_cfg_orig = dicts.load_dict(ad_file, "ad")
ad_cfg = copy.deepcopy(ad_cfg_orig)
dicts.apply_defaults(ad_cfg,
self.config.ad_defaults.model_dump(),
ignore = lambda k, _: k == "description",
override = lambda _, v: v == "" # noqa: PLC1901 can be simplified to `not v` as an empty string is falsey
)
dicts.apply_defaults(ad_cfg, ad_fields)
ad_cfg_orig:dict[str, Any] = dicts.load_dict(ad_file, "ad")
ad_cfg:Ad = self.load_ad(ad_cfg_orig)
if ignore_inactive and not ad_cfg["active"]:
if ignore_inactive and not ad_cfg.active:
LOG.info(" -> SKIPPED: inactive ad [%s]", ad_file_relative)
continue
if use_specific_ads:
if ad_cfg["id"] not in ids:
if ad_cfg.id not in ids:
LOG.info(" -> SKIPPED: ad [%s] is not in list of given ids.", ad_file_relative)
continue
else:
@@ -373,9 +366,9 @@ class KleinanzeigenBot(WebScrapingMixin):
should_include = True
# Check for 'new' selector
if "new" in selectors and (not ad_cfg["id"] or not check_id):
if "new" in selectors and (not ad_cfg.id or not check_id):
should_include = True
elif "new" in selectors and ad_cfg["id"] and check_id:
elif "new" in selectors and ad_cfg.id and check_id:
LOG.info(" -> SKIPPED: ad [%s] is not new. already has an id assigned.", ad_file_relative)
# Check for 'due' selector
@@ -391,56 +384,27 @@ class KleinanzeigenBot(WebScrapingMixin):
if not should_include:
continue
def assert_one_of(path:str, allowed:Iterable[str]) -> None:
# ruff: noqa: B023 function-uses-loop-variable
ensure(dicts.safe_get(ad_cfg, *path.split(".")) in allowed, f"-> property [{path}] must be one of: {allowed} @ [{ad_file}]")
def assert_min_len(path:str, minlen:int) -> None:
ensure(len(dicts.safe_get(ad_cfg, *path.split("."))) >= minlen,
f"-> property [{path}] must be at least {minlen} characters long @ [{ad_file}]")
def assert_has_value(path:str) -> None:
ensure(dicts.safe_get(ad_cfg, *path.split(".")), f"-> property [{path}] not specified @ [{ad_file}]")
# pylint: enable=cell-var-from-loop
assert_one_of("type", {"OFFER", "WANTED"})
assert_min_len("title", 10)
ensure(self.__get_description(ad_cfg, with_affixes = False), f"-> property [description] not specified @ [{ad_file}]")
self.__get_description(ad_cfg, with_affixes = True) # validates complete description
assert_one_of("price_type", {"FIXED", "NEGOTIABLE", "GIVE_AWAY", "NOT_APPLICABLE"})
if ad_cfg["price_type"] == "GIVE_AWAY":
ensure(not dicts.safe_get(ad_cfg, "price"), f"-> [price] must not be specified for GIVE_AWAY ad @ [{ad_file}]")
elif ad_cfg["price_type"] == "FIXED":
assert_has_value("price")
assert_one_of("shipping_type", {"PICKUP", "SHIPPING", "NOT_APPLICABLE"})
assert_has_value("contact.name")
assert_has_value("republication_interval")
if ad_cfg["id"]:
ad_cfg["id"] = int(ad_cfg["id"])
if ad_cfg["category"]:
resolved_category_id = self.categories.get(ad_cfg["category"])
if not resolved_category_id and ">" in ad_cfg["category"]:
if ad_cfg.category:
resolved_category_id = self.categories.get(ad_cfg.category)
if not resolved_category_id and ">" in ad_cfg.category:
# this maps actually to the sonstiges/weiteres sub-category
parent_category = ad_cfg["category"].rpartition(">")[0].strip()
parent_category = ad_cfg.category.rpartition(">")[0].strip()
resolved_category_id = self.categories.get(parent_category)
if resolved_category_id:
LOG.warning(
"Category [%s] unknown. Using category [%s] with ID [%s] instead.",
ad_cfg["category"], parent_category, resolved_category_id)
ad_cfg.category, parent_category, resolved_category_id)
if resolved_category_id:
ad_cfg["category"] = resolved_category_id
ad_cfg.category = resolved_category_id
if ad_cfg["shipping_costs"]:
ad_cfg["shipping_costs"] = str(round(parse_decimal(ad_cfg["shipping_costs"]), 2))
if ad_cfg["images"]:
if ad_cfg.images:
images = []
ad_dir = os.path.dirname(ad_file)
for image_pattern in ad_cfg["images"]:
for image_pattern in ad_cfg.images:
pattern_images = set()
for image_file in glob.glob(image_pattern, root_dir = ad_dir, flags = glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB):
_, image_file_ext = os.path.splitext(image_file)
@@ -450,8 +414,8 @@ class KleinanzeigenBot(WebScrapingMixin):
else:
pattern_images.add(abspath(image_file, relative_to = ad_file))
images.extend(sorted(pattern_images))
ensure(images or not ad_cfg["images"], f"No images found for given file patterns {ad_cfg['images']} at {ad_dir}")
ad_cfg["images"] = list(dict.fromkeys(images))
ensure(images or not ad_cfg.images, f"No images found for given file patterns {ad_cfg.images} at {ad_dir}")
ad_cfg.images = list(dict.fromkeys(images))
ads.append((
ad_file,
@@ -462,6 +426,15 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info("Loaded %s", pluralize("ad", ads))
return ads
def load_ad(self, ad_cfg_orig:dict[str, Any]) -> Ad:
ad_cfg_merged = dicts.apply_defaults(
target = copy.deepcopy(ad_cfg_orig),
defaults = self.config.ad_defaults.model_dump(),
ignore = lambda k, _: k == "description",
override = lambda _, v: v == "" # noqa: PLC1901 can be simplified to `not v` as an empty string is falsey
)
return Ad.model_validate(ad_cfg_merged)
def load_config(self) -> None:
# write default config.yaml if config file does not exist
if not os.path.exists(self.config_file_path):
@@ -563,7 +536,7 @@ class KleinanzeigenBot(WebScrapingMixin):
return False
return False
async def delete_ads(self, ad_cfgs:list[tuple[str, dict[str, Any], dict[str, Any]]]) -> None:
async def delete_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None:
count = 0
published_ads = json.loads(
@@ -571,7 +544,7 @@ class KleinanzeigenBot(WebScrapingMixin):
for (ad_file, ad_cfg, _ad_cfg_orig) in ad_cfgs:
count += 1
LOG.info("Processing %s/%s: '%s' from [%s]...", count, len(ad_cfgs), ad_cfg["title"], ad_file)
LOG.info("Processing %s/%s: '%s' from [%s]...", count, len(ad_cfgs), ad_cfg.title, ad_file)
await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config.publishing.delete_old_ads_by_title)
await self.web_sleep()
@@ -579,8 +552,8 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info("DONE: Deleted %s", pluralize("ad", count))
LOG.info("############################################")
async def delete_ad(self, ad_cfg:dict[str, Any], published_ads:list[dict[str, Any]], *, delete_old_ads_by_title:bool) -> bool:
LOG.info("Deleting ad '%s' if already present...", ad_cfg["title"])
async def delete_ad(self, ad_cfg:Ad, published_ads:list[dict[str, Any]], *, delete_old_ads_by_title:bool) -> bool:
LOG.info("Deleting ad '%s' if already present...", ad_cfg.title)
await self.web_open(f"{self.root_url}/m-meine-anzeigen.html")
csrf_token_elem = await self.web_find(By.CSS_SELECTOR, "meta[name=_csrf]")
@@ -592,35 +565,35 @@ class KleinanzeigenBot(WebScrapingMixin):
for published_ad in published_ads:
published_ad_id = int(published_ad.get("id", -1))
published_ad_title = published_ad.get("title", "")
if ad_cfg["id"] == published_ad_id or ad_cfg["title"] == published_ad_title:
if ad_cfg.id == published_ad_id or ad_cfg.title == published_ad_title:
LOG.info(" -> deleting %s '%s'...", published_ad_id, published_ad_title)
await self.web_request(
url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={published_ad_id}",
method = "POST",
headers = {"x-csrf-token": csrf_token}
)
elif ad_cfg["id"]:
elif ad_cfg.id:
await self.web_request(
url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={ad_cfg['id']}",
url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={ad_cfg.id}",
method = "POST",
headers = {"x-csrf-token": csrf_token},
valid_response_codes = [200, 404]
)
await self.web_sleep()
ad_cfg["id"] = None
ad_cfg.id = None
return True
async def publish_ads(self, ad_cfgs:list[tuple[str, dict[str, Any], dict[str, Any]]]) -> None:
async def publish_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None:
count = 0
published_ads = json.loads(
(await self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT"))["content"])["ads"]
for (ad_file, ad_cfg, ad_cfg_orig) in ad_cfgs:
LOG.info("Processing %s/%s: '%s' from [%s]...", count + 1, len(ad_cfgs), ad_cfg["title"], ad_file)
LOG.info("Processing %s/%s: '%s' from [%s]...", count + 1, len(ad_cfgs), ad_cfg.title, ad_file)
if [x for x in published_ads if x["id"] == ad_cfg["id"] and x["state"] == "paused"]:
if [x for x in published_ads if x["id"] == ad_cfg.id and x["state"] == "paused"]:
LOG.info("Skipping because ad is reserved")
continue
@@ -636,7 +609,7 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info("DONE: (Re-)published %s", pluralize("ad", count))
LOG.info("############################################")
async def publish_ad(self, ad_file:str, ad_cfg:dict[str, Any], ad_cfg_orig:dict[str, Any], published_ads:list[dict[str, Any]]) -> None:
async def publish_ad(self, ad_file:str, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], published_ads:list[dict[str, Any]]) -> None:
"""
@param ad_cfg: the effective ad config (i.e. with default values applied etc.)
@param ad_cfg_orig: the ad config as present in the YAML file
@@ -647,26 +620,26 @@ class KleinanzeigenBot(WebScrapingMixin):
if self.config.publishing.delete_old_ads == "BEFORE_PUBLISH" and not self.keep_old_ads:
await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config.publishing.delete_old_ads_by_title)
LOG.info("Publishing ad '%s'...", ad_cfg["title"])
LOG.info("Publishing ad '%s'...", ad_cfg.title)
if loggers.is_debug(LOG):
LOG.debug(" -> effective ad meta:")
YAML().dump(ad_cfg, sys.stdout)
YAML().dump(ad_cfg.model_dump(), sys.stdout)
await self.web_open(f"{self.root_url}/p-anzeige-aufgeben-schritt2.html")
if ad_cfg["type"] == "WANTED":
if ad_cfg.type == "WANTED":
await self.web_click(By.ID, "adType2")
#############################
# set title
#############################
await self.web_input(By.ID, "postad-title", ad_cfg["title"])
await self.web_input(By.ID, "postad-title", ad_cfg.title)
#############################
# set category
#############################
await self.__set_category(ad_cfg["category"], ad_file)
await self.__set_category(ad_cfg.category, ad_file)
#############################
# set special attributes
@@ -676,36 +649,36 @@ class KleinanzeigenBot(WebScrapingMixin):
#############################
# set shipping type/options/costs
#############################
if ad_cfg["type"] == "WANTED":
if ad_cfg.type == "WANTED":
# special handling for ads of type WANTED since shipping is a special attribute for these
if ad_cfg["shipping_type"] in {"PICKUP", "SHIPPING"}:
shipping_value = "ja" if ad_cfg["shipping_type"] == "SHIPPING" else "nein"
if ad_cfg.shipping_type in {"PICKUP", "SHIPPING"}:
shipping_value = "ja" if ad_cfg.shipping_type == "SHIPPING" else "nein"
try:
await self.web_select(By.XPATH, "//select[contains(@id, '.versand_s')]", shipping_value)
except TimeoutError:
LOG.warning("Failed to set shipping attribute for type '%s'!", ad_cfg["shipping_type"])
LOG.warning("Failed to set shipping attribute for type '%s'!", ad_cfg.shipping_type)
else:
await self.__set_shipping(ad_cfg)
#############################
# set price
#############################
price_type = ad_cfg["price_type"]
price_type = ad_cfg.price_type
if price_type != "NOT_APPLICABLE":
try:
await self.web_select(By.CSS_SELECTOR, "select#price-type-react, select#micro-frontend-price-type, select#priceType", price_type)
except TimeoutError:
pass
if dicts.safe_get(ad_cfg, "price"):
await self.web_input(By.CSS_SELECTOR, "input#post-ad-frontend-price, input#micro-frontend-price, input#pstad-price", ad_cfg["price"])
if ad_cfg.price:
await self.web_input(By.CSS_SELECTOR, "input#post-ad-frontend-price, input#micro-frontend-price, input#pstad-price", str(ad_cfg.price))
#############################
# set sell_directly
#############################
sell_directly = ad_cfg["sell_directly"]
sell_directly = ad_cfg.sell_directly
try:
if ad_cfg["shipping_type"] == "SHIPPING":
if sell_directly and ad_cfg["shipping_options"] and price_type in {"FIXED", "NEGOTIABLE"}:
if ad_cfg.shipping_type == "SHIPPING":
if sell_directly and ad_cfg.shipping_options and price_type in {"FIXED", "NEGOTIABLE"}:
if not await self.web_check(By.ID, "radio-buy-now-yes", Is.SELECTED):
await self.web_click(By.ID, "radio-buy-now-yes")
elif not await self.web_check(By.ID, "radio-buy-now-no", Is.SELECTED):
@@ -722,16 +695,16 @@ class KleinanzeigenBot(WebScrapingMixin):
#############################
# set contact zipcode
#############################
if ad_cfg["contact"]["zipcode"]:
await self.web_input(By.ID, "pstad-zip", ad_cfg["contact"]["zipcode"])
if ad_cfg.contact.zipcode:
await self.web_input(By.ID, "pstad-zip", ad_cfg.contact.zipcode)
# Set city if location is specified
if ad_cfg["contact"].get("location"):
if ad_cfg.contact.location:
try:
await self.web_sleep(1) # Wait for city dropdown to populate
options = await self.web_find_all(By.CSS_SELECTOR, "#pstad-citychsr option")
for option in options:
option_text = await self.web_text(By.CSS_SELECTOR, "option", parent = option)
if option_text == ad_cfg["contact"]["location"]:
if option_text == ad_cfg.contact.location:
await self.web_select(By.ID, "pstad-citychsr", option_text)
break
except TimeoutError:
@@ -740,7 +713,7 @@ class KleinanzeigenBot(WebScrapingMixin):
#############################
# set contact street
#############################
if ad_cfg["contact"]["street"]:
if ad_cfg.contact.street:
try:
if await self.web_check(By.ID, "pstad-street", Is.DISABLED):
await self.web_click(By.ID, "addressVisibility")
@@ -748,18 +721,18 @@ class KleinanzeigenBot(WebScrapingMixin):
except TimeoutError:
# ignore
pass
await self.web_input(By.ID, "pstad-street", ad_cfg["contact"]["street"])
await self.web_input(By.ID, "pstad-street", ad_cfg.contact.street)
#############################
# set contact name
#############################
if ad_cfg["contact"]["name"] and not await self.web_check(By.ID, "postad-contactname", Is.READONLY):
await self.web_input(By.ID, "postad-contactname", ad_cfg["contact"]["name"])
if ad_cfg.contact.name and not await self.web_check(By.ID, "postad-contactname", Is.READONLY):
await self.web_input(By.ID, "postad-contactname", ad_cfg.contact.name)
#############################
# set contact phone
#############################
if ad_cfg["contact"]["phone"]:
if ad_cfg.contact.phone:
if await self.web_check(By.ID, "postad-phonenumber", Is.DISPLAYED):
try:
if await self.web_check(By.ID, "postad-phonenumber", Is.DISABLED):
@@ -768,7 +741,7 @@ class KleinanzeigenBot(WebScrapingMixin):
except TimeoutError:
# ignore
pass
await self.web_input(By.ID, "postad-phonenumber", ad_cfg["contact"]["phone"])
await self.web_input(By.ID, "postad-phonenumber", ad_cfg.contact.phone)
#############################
# upload images
@@ -810,7 +783,7 @@ class KleinanzeigenBot(WebScrapingMixin):
# check for no image question
try:
image_hint_xpath = '//*[contains(@class, "ModalDialog--Actions")]//button[contains(., "Ohne Bild veröffentlichen")]'
if not ad_cfg["images"] and await self.web_check(By.XPATH, image_hint_xpath, Is.DISPLAYED):
if not ad_cfg.images and await self.web_check(By.XPATH, image_hint_xpath, Is.DISPLAYED):
await self.web_click(By.XPATH, image_hint_xpath)
except TimeoutError:
pass # nosec
@@ -833,8 +806,8 @@ class KleinanzeigenBot(WebScrapingMixin):
# Update content hash after successful publication
# Calculate hash on original config to ensure consistent comparison on restart
ad_cfg_orig["content_hash"] = calculate_content_hash(ad_cfg_orig)
ad_cfg_orig["updated_on"] = misc.now().isoformat()
if not ad_cfg["created_on"] and not ad_cfg["id"]:
ad_cfg_orig["updated_on"] = misc.now().isoformat(timespec = "seconds")
if not ad_cfg.created_on and not ad_cfg.id:
ad_cfg_orig["created_on"] = ad_cfg_orig["updated_on"]
LOG.info(" -> SUCCESS: ad published with ID %s", ad_id)
@@ -893,55 +866,57 @@ class KleinanzeigenBot(WebScrapingMixin):
else:
ensure(is_category_auto_selected, f"No category specified in [{ad_file}] and automatic category detection failed")
async def __set_special_attributes(self, ad_cfg:dict[str, Any]) -> None:
if ad_cfg["special_attributes"]:
LOG.debug("Found %i special attributes", len(ad_cfg["special_attributes"]))
for special_attribute_key, special_attribute_value in ad_cfg["special_attributes"].items():
async def __set_special_attributes(self, ad_cfg:Ad) -> None:
if not ad_cfg.special_attributes:
return
if special_attribute_key == "condition_s":
await self.__set_condition(special_attribute_value)
continue
LOG.debug("Found %i special attributes", len(ad_cfg.special_attributes))
for special_attribute_key, special_attribute_value in ad_cfg.special_attributes.items():
LOG.debug("Setting special attribute [%s] to [%s]...", special_attribute_key, special_attribute_value)
try:
# if the <select> element exists but is inside an invisible container, make the container visible
select_container_xpath = f"//div[@class='l-row' and descendant::select[@id='{special_attribute_key}']]"
if not await self.web_check(By.XPATH, select_container_xpath, Is.DISPLAYED):
await (await self.web_find(By.XPATH, select_container_xpath)).apply("elem => elem.singleNodeValue.style.display = 'block'")
except TimeoutError:
pass # nosec
if special_attribute_key == "condition_s":
await self.__set_condition(special_attribute_value)
continue
try:
# finding element by name cause id are composed sometimes eg. autos.marke_s+autos.model_s for Modell by cars
special_attr_elem = await self.web_find(By.XPATH, f"//*[contains(@name, '{special_attribute_key}')]")
except TimeoutError as ex:
LOG.debug("Attribute field '%s' could not be found.", special_attribute_key)
raise TimeoutError(f"Failed to set special attribute [{special_attribute_key}] (not found)") from ex
LOG.debug("Setting special attribute [%s] to [%s]...", special_attribute_key, special_attribute_value)
try:
# if the <select> element exists but is inside an invisible container, make the container visible
select_container_xpath = f"//div[@class='l-row' and descendant::select[@id='{special_attribute_key}']]"
if not await self.web_check(By.XPATH, select_container_xpath, Is.DISPLAYED):
await (await self.web_find(By.XPATH, select_container_xpath)).apply("elem => elem.singleNodeValue.style.display = 'block'")
except TimeoutError:
pass # nosec
try:
elem_id = special_attr_elem.attrs.id
if special_attr_elem.local_name == "select":
LOG.debug("Attribute field '%s' seems to be a select...", special_attribute_key)
await self.web_select(By.ID, elem_id, special_attribute_value)
elif special_attr_elem.attrs.type == "checkbox":
LOG.debug("Attribute field '%s' seems to be a checkbox...", special_attribute_key)
await self.web_click(By.ID, elem_id)
else:
LOG.debug("Attribute field '%s' seems to be a text input...", special_attribute_key)
await self.web_input(By.ID, elem_id, special_attribute_value)
except TimeoutError as ex:
LOG.debug("Attribute field '%s' is not of kind radio button.", special_attribute_key)
raise TimeoutError(f"Failed to set special attribute [{special_attribute_key}]") from ex
LOG.debug("Successfully set attribute field [%s] to [%s]...", special_attribute_key, special_attribute_value)
try:
# finding element by name cause id are composed sometimes eg. autos.marke_s+autos.model_s for Modell by cars
special_attr_elem = await self.web_find(By.XPATH, f"//*[contains(@name, '{special_attribute_key}')]")
except TimeoutError as ex:
LOG.debug("Attribute field '%s' could not be found.", special_attribute_key)
raise TimeoutError(f"Failed to set special attribute [{special_attribute_key}] (not found)") from ex
async def __set_shipping(self, ad_cfg:dict[str, Any]) -> None:
if ad_cfg["shipping_type"] == "PICKUP":
try:
elem_id = special_attr_elem.attrs.id
if special_attr_elem.local_name == "select":
LOG.debug("Attribute field '%s' seems to be a select...", special_attribute_key)
await self.web_select(By.ID, elem_id, special_attribute_value)
elif special_attr_elem.attrs.type == "checkbox":
LOG.debug("Attribute field '%s' seems to be a checkbox...", special_attribute_key)
await self.web_click(By.ID, elem_id)
else:
LOG.debug("Attribute field '%s' seems to be a text input...", special_attribute_key)
await self.web_input(By.ID, elem_id, special_attribute_value)
except TimeoutError as ex:
LOG.debug("Attribute field '%s' is not of kind radio button.", special_attribute_key)
raise TimeoutError(f"Failed to set special attribute [{special_attribute_key}]") from ex
LOG.debug("Successfully set attribute field [%s] to [%s]...", special_attribute_key, special_attribute_value)
async def __set_shipping(self, ad_cfg:Ad) -> None:
if ad_cfg.shipping_type == "PICKUP":
try:
await self.web_click(By.XPATH,
'//*[contains(@class, "ShippingPickupSelector")]//label[contains(., "Nur Abholung")]/../input[@type="radio"]')
except TimeoutError as ex:
LOG.debug(ex, exc_info = True)
elif ad_cfg["shipping_options"]:
elif ad_cfg.shipping_options:
await self.web_click(By.XPATH, '//*[contains(@class, "SubSection")]//button[contains(@class, "SelectionButton")]')
await self.web_click(By.XPATH, '//*[contains(@class, "CarrierSelectionModal")]//button[contains(., "Andere Versandmethoden")]')
await self.__set_shipping_options(ad_cfg)
@@ -949,25 +924,28 @@ class KleinanzeigenBot(WebScrapingMixin):
special_shipping_selector = '//select[contains(@id, ".versand_s")]'
if await self.web_check(By.XPATH, special_shipping_selector, Is.DISPLAYED):
# try to set special attribute selector (then we have a commercial account)
shipping_value = "ja" if ad_cfg["shipping_type"] == "SHIPPING" else "nein"
shipping_value = "ja" if ad_cfg.shipping_type == "SHIPPING" else "nein"
await self.web_select(By.XPATH, special_shipping_selector, shipping_value)
else:
try:
# no options. only costs. Set custom shipping cost
if ad_cfg["shipping_costs"] is not None:
if ad_cfg.shipping_costs is not None:
await self.web_click(By.XPATH, '//*[contains(@class, "SubSection")]//button[contains(@class, "SelectionButton")]')
await self.web_click(By.XPATH, '//*[contains(@class, "CarrierSelectionModal")]//button[contains(., "Andere Versandmethoden")]')
await self.web_click(By.XPATH, '//*[contains(@id, "INDIVIDUAL") and contains(@data-testid, "Individueller Versand")]')
if ad_cfg["shipping_costs"]:
if ad_cfg.shipping_costs:
await self.web_input(By.CSS_SELECTOR, '.IndividualShippingInput input[type="text"]',
str.replace(ad_cfg["shipping_costs"], ".", ","))
str.replace(str(ad_cfg.shipping_costs), ".", ","))
await self.web_click(By.XPATH, '//dialog//button[contains(., "Fertig")]')
except TimeoutError as ex:
LOG.debug(ex, exc_info = True)
raise TimeoutError(_("Unable to close shipping dialog!")) from ex
async def __set_shipping_options(self, ad_cfg:dict[str, Any]) -> None:
async def __set_shipping_options(self, ad_cfg:Ad) -> None:
if not ad_cfg.shipping_options:
return
shipping_options_mapping = {
"DHL_2": ("Klein", "Paket 2 kg"),
"Hermes_Päckchen": ("Klein", "Päckchen"),
@@ -980,12 +958,9 @@ class KleinanzeigenBot(WebScrapingMixin):
"Hermes_L": ("Groß", "L-Paket"),
}
try:
mapped_shipping_options = [
shipping_options_mapping[option]
for option in set(ad_cfg["shipping_options"])
]
mapped_shipping_options = [shipping_options_mapping[option] for option in set(ad_cfg.shipping_options)]
except KeyError as ex:
raise KeyError(f"Unknown shipping option(s), please refer to the documentation/README: {ad_cfg['shipping_options']}") from ex
raise KeyError(f"Unknown shipping option(s), please refer to the documentation/README: {ad_cfg.shipping_options}") from ex
shipping_sizes, shipping_packages = zip(*mapped_shipping_options, strict = False)
@@ -1029,11 +1004,14 @@ class KleinanzeigenBot(WebScrapingMixin):
except TimeoutError as ex:
raise TimeoutError(_("Unable to close shipping dialog!")) from ex
async def __upload_images(self, ad_cfg:dict[str, Any]) -> None:
LOG.info(" -> found %s", pluralize("image", ad_cfg["images"]))
async def __upload_images(self, ad_cfg:Ad) -> None:
if not ad_cfg.images:
return
LOG.info(" -> found %s", pluralize("image", ad_cfg.images))
image_upload:Element = await self.web_find(By.CSS_SELECTOR, "input[type=file]")
for image in ad_cfg["images"]:
for image in ad_cfg.images:
LOG.info(" -> uploading image [%s]", image)
await image_upload.send_file(image)
await self.web_sleep()
@@ -1108,14 +1086,13 @@ class KleinanzeigenBot(WebScrapingMixin):
else:
LOG.error("The page with the id %d does not exist!", ad_id)
def __get_description(self, ad_cfg:dict[str, Any], *, with_affixes:bool) -> str:
def __get_description(self, ad_cfg:Ad, *, with_affixes:bool) -> str:
"""Get the ad description optionally with prefix and suffix applied.
Precedence (highest to lowest):
1. Direct ad-level affixes (description_prefix/suffix)
2. Legacy nested ad-level affixes (description.prefix/suffix)
3. Global flattened affixes (ad_defaults.description_prefix/suffix)
4. Legacy global nested affixes (ad_defaults.description.prefix/suffix)
2. Global flattened affixes (ad_defaults.description_prefix/suffix)
3. Legacy global nested affixes (ad_defaults.description.prefix/suffix)
Args:
ad_cfg: The ad configuration dictionary
@@ -1125,20 +1102,15 @@ class KleinanzeigenBot(WebScrapingMixin):
"""
# Get the main description text
description_text = ""
if isinstance(ad_cfg.get("description"), dict):
description_text = ad_cfg["description"].get("text", "")
elif isinstance(ad_cfg.get("description"), str):
description_text = ad_cfg["description"]
if ad_cfg.description:
description_text = ad_cfg.description
if with_affixes:
# Get prefix with precedence
prefix = (
# 1. Direct ad-level prefix
ad_cfg.get("description_prefix") if ad_cfg.get("description_prefix") is not None
# 2. Legacy nested ad-level prefix
else dicts.safe_get(ad_cfg, "description", "prefix")
if dicts.safe_get(ad_cfg, "description", "prefix") is not None
# 3. Global prefix from config
ad_cfg.description_prefix if ad_cfg.description_prefix is not None
# 2. Global prefix from config
else get_description_affixes(self.config, prefix = True)
or "" # Default to empty string if all sources are None
)
@@ -1146,11 +1118,8 @@ class KleinanzeigenBot(WebScrapingMixin):
# Get suffix with precedence
suffix = (
# 1. Direct ad-level suffix
ad_cfg.get("description_suffix") if ad_cfg.get("description_suffix") is not None
# 2. Legacy nested ad-level suffix
else dicts.safe_get(ad_cfg, "description", "suffix")
if dicts.safe_get(ad_cfg, "description", "suffix") is not None
# 3. Global suffix from config
ad_cfg.description_suffix if ad_cfg.description_suffix is not None
# 2. Global suffix from config
else get_description_affixes(self.config, prefix = False)
or "" # Default to empty string if all sources are None
)