""" SPDX-FileCopyrightText: © Sebastian Thomschke and contributors SPDX-License-Identifier: AGPL-3.0-or-later SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ """ import asyncio, atexit, copy, importlib.metadata, json, os, re, signal, shutil, sys, textwrap, time import getopt # pylint: disable=deprecated-module import urllib.parse as urllib_parse import urllib.request as urllib_request from collections.abc import Iterable from datetime import datetime from gettext import gettext as _ from typing import Any, Final import certifi, colorama, nodriver from ruamel.yaml import YAML from wcmatch import glob from . import extract, resources from .ads import calculate_content_hash, get_description_affixes from .utils import dicts, error_handlers, loggers, misc from .utils.files import abspath from .utils.i18n import Locale, get_current_locale, set_current_locale, pluralize from .utils.misc import ainput, ensure, is_frozen, parse_datetime, parse_decimal from .utils.web_scraping_mixin import By, Element, Page, Is, WebScrapingMixin from ._version import __version__ # W0406: possibly a bug, see https://github.com/PyCQA/pylint/issues/3933 LOG:Final[loggers.Logger] = loggers.get_logger(__name__) LOG.setLevel(loggers.INFO) colorama.just_fix_windows_console() class KleinanzeigenBot(WebScrapingMixin): def __init__(self) -> None: # workaround for https://github.com/Second-Hand-Friends/kleinanzeigen-bot/issues/295 # see https://github.com/pyinstaller/pyinstaller/issues/7229#issuecomment-1309383026 os.environ["SSL_CERT_FILE"] = certifi.where() super().__init__() self.root_url = "https://www.kleinanzeigen.de" self.config:dict[str, Any] = {} self.config_file_path = abspath("config.yaml") self.categories:dict[str, str] = {} self.file_log:loggers.LogFileHandle | None = None log_file_basename = is_frozen() and os.path.splitext(os.path.basename(sys.executable))[0] or self.__module__ self.log_file_path:str | None = abspath(f"{log_file_basename}.log") self.command = "help" self.ads_selector = "due" self.keep_old_ads = False def __del__(self) -> None: if self.file_log: self.file_log.close() self.file_log = None self.close_browser_session() def get_version(self) -> str: return __version__ async def run(self, args:list[str]) -> None: self.parse_args(args) try: match self.command: case "help": self.show_help() case "version": print(self.get_version()) case "verify": self.configure_file_logging() self.load_config() self.load_ads() LOG.info("############################################") LOG.info("DONE: No configuration errors found.") LOG.info("############################################") case "publish": self.configure_file_logging() self.load_config() if not (self.ads_selector in {'all', 'new', 'due'} or re.compile(r'\d+[,\d+]*').search(self.ads_selector)): LOG.warning('You provided no ads selector. Defaulting to "due".') self.ads_selector = 'due' if ads := self.load_ads(): await self.create_browser_session() await self.login() await self.publish_ads(ads) else: LOG.info("############################################") LOG.info("DONE: No new/outdated ads found.") LOG.info("############################################") case "delete": self.configure_file_logging() self.load_config() if ads := self.load_ads(): await self.create_browser_session() await self.login() await self.delete_ads(ads) else: LOG.info("############################################") LOG.info("DONE: No ads to delete found.") LOG.info("############################################") case "download": self.configure_file_logging() # ad IDs depends on selector if not (self.ads_selector in {'all', 'new'} or re.compile(r'\d+[,\d+]*').search(self.ads_selector)): LOG.warning('You provided no ads selector. Defaulting to "new".') self.ads_selector = 'new' self.load_config() await self.create_browser_session() await self.login() await self.download_ads() case _: LOG.error("Unknown command: %s", self.command) sys.exit(2) finally: self.close_browser_session() def show_help(self) -> None: if is_frozen(): exe = sys.argv[0] elif os.getenv("PDM_PROJECT_ROOT", ""): exe = "pdm run app" else: exe = "python -m kleinanzeigen_bot" if get_current_locale().language == "de": print(textwrap.dedent(f"""\ Verwendung: {colorama.Fore.LIGHTMAGENTA_EX}{exe} BEFEHL [OPTIONEN]{colorama.Style.RESET_ALL} Befehle: publish - (Wieder-)Veröffentlicht Anzeigen verify - Überprüft die Konfigurationsdateien delete - Löscht Anzeigen download - Lädt eine oder mehrere Anzeigen herunter -- help - Zeigt diese Hilfe an (Standardbefehl) version - Zeigt die Version der Anwendung an Optionen: --ads=all|due|new| (publish) - Gibt an, welche Anzeigen (erneut) veröffentlicht werden sollen (STANDARD: due) Mögliche Werte: * all: Veröffentlicht alle Anzeigen erneut, ignoriert republication_interval * due: Veröffentlicht alle neuen Anzeigen und erneut entsprechend dem republication_interval * new: Veröffentlicht nur neue Anzeigen (d.h. Anzeigen ohne ID in der Konfigurationsdatei) * : Gibt eine oder mehrere Anzeigen-IDs an, die veröffentlicht werden sollen, z. B. "--ads=1,2,3", ignoriert republication_interval --ads=all|new| (download) - Gibt an, welche Anzeigen heruntergeladen werden sollen (STANDARD: new) Mögliche Werte: * all: Lädt alle Anzeigen aus Ihrem Profil herunter * new: Lädt Anzeigen aus Ihrem Profil herunter, die lokal noch nicht gespeichert sind * : Gibt eine oder mehrere Anzeigen-IDs zum Herunterladen an, z. B. "--ads=1,2,3" --force - Alias für '--ads=all' --keep-old - Verhindert das Löschen alter Anzeigen bei erneuter Veröffentlichung --config= - Pfad zur YAML- oder JSON-Konfigurationsdatei (STANDARD: ./config.yaml) --logfile= - Pfad zur Protokolldatei (STANDARD: ./kleinanzeigen-bot.log) --lang=en|de - Anzeigesprache (STANDARD: Systemsprache, wenn unterstützt, sonst Englisch) -v, --verbose - Aktiviert detaillierte Ausgabe – nur nützlich zur Fehlerbehebung """.rstrip())) else: print(textwrap.dedent(f"""\ Usage: {colorama.Fore.LIGHTMAGENTA_EX}{exe} COMMAND [OPTIONS]{colorama.Style.RESET_ALL} Commands: publish - (re-)publishes ads verify - verifies the configuration files delete - deletes ads download - downloads one or multiple ads -- help - displays this help (default command) version - displays the application version Options: --ads=all|due|new| (publish) - specifies which ads to (re-)publish (DEFAULT: due) Possible values: * all: (re-)publish all ads ignoring republication_interval * due: publish all new ads and republish ads according the republication_interval * new: only publish new ads (i.e. ads that have no id in the config file) * : provide one or several ads by ID to (re-)publish, like e.g. "--ads=1,2,3" ignoring republication_interval --ads=all|new| (download) - specifies which ads to download (DEFAULT: new) Possible values: * all: downloads all ads from your profile * new: downloads ads from your profile that are not locally saved yet * : provide one or several ads by ID to download, like e.g. "--ads=1,2,3" --force - alias for '--ads=all' --keep-old - don't delete old ads on republication --config= - path to the config YAML or JSON file (DEFAULT: ./config.yaml) --logfile= - path to the logfile (DEFAULT: ./kleinanzeigen-bot.log) --lang=en|de - display language (STANDARD: system language if supported, otherwise English) -v, --verbose - enables verbose output - only useful when troubleshooting issues """.rstrip())) def parse_args(self, args:list[str]) -> None: try: options, arguments = getopt.gnu_getopt(args[1:], "hv", [ "ads=", "config=", "force", "help", "keep-old", "logfile=", "lang=", "verbose" ]) except getopt.error as ex: LOG.error(ex.msg) LOG.error("Use --help to display available options.") sys.exit(2) for option, value in options: match option: case "-h" | "--help": self.show_help() sys.exit(0) case "--config": self.config_file_path = abspath(value) case "--logfile": if value: self.log_file_path = abspath(value) else: self.log_file_path = None case "--ads": self.ads_selector = value.strip().lower() case "--force": self.ads_selector = "all" case "--keep-old": self.keep_old_ads = True case "--lang": set_current_locale(Locale.of(value)) case "-v" | "--verbose": LOG.setLevel(loggers.DEBUG) loggers.get_logger("nodriver").setLevel(loggers.INFO) match len(arguments): case 0: self.command = "help" case 1: self.command = arguments[0] case _: LOG.error("More than one command given: %s", arguments) sys.exit(2) def configure_file_logging(self) -> None: if not self.log_file_path: return if self.file_log: return LOG.info("Logging to [%s]...", self.log_file_path) self.file_log = loggers.configure_file_logging(self.log_file_path) LOG.info("App version: %s", self.get_version()) LOG.info("Python version: %s", sys.version) def __check_ad_republication(self, ad_cfg: dict[str, Any], ad_cfg_orig: dict[str, Any], ad_file_relative: str) -> bool: """ Check if an ad needs to be republished based on changes and republication interval. Returns True if the ad should be republished. """ if ad_cfg["updated_on"]: last_updated_on = parse_datetime(ad_cfg["updated_on"]) elif ad_cfg["created_on"]: last_updated_on = parse_datetime(ad_cfg["created_on"]) else: return True if not last_updated_on: return True # Check for changes first if ad_cfg["id"]: # Calculate hash on original config to match what was stored current_hash = calculate_content_hash(ad_cfg_orig) stored_hash = ad_cfg_orig.get("content_hash") LOG.debug("Hash comparison for [%s]:", ad_file_relative) LOG.debug(" Stored hash: %s", stored_hash) LOG.debug(" Current hash: %s", current_hash) if stored_hash and current_hash == stored_hash: # No changes - check republication interval ad_age = datetime.utcnow() - last_updated_on if ad_age.days <= ad_cfg["republication_interval"]: LOG.info( " -> SKIPPED: ad [%s] was last published %d days ago. republication is only required every %s days", ad_file_relative, ad_age.days, ad_cfg["republication_interval"] ) return False else: LOG.info("Changes detected in ad [%s], will republish", ad_file_relative) # Update hash in original configuration ad_cfg_orig["content_hash"] = current_hash return True return True def load_ads(self, *, ignore_inactive:bool = True, check_id:bool = True) -> list[tuple[str, dict[str, Any], dict[str, Any]]]: LOG.info("Searching for ad config files...") ad_files:dict[str, str] = {} data_root_dir = os.path.dirname(self.config_file_path) for file_pattern in self.config["ad_files"]: for ad_file in glob.glob(file_pattern, root_dir = data_root_dir, flags = glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB): if not str(ad_file).endswith('ad_fields.yaml'): ad_files[abspath(ad_file, relative_to = data_root_dir)] = ad_file LOG.info(" -> found %s", pluralize("ad config file", ad_files)) if not ad_files: return [] ids = [] use_specific_ads = False if re.compile(r'\d+[,\d+]*').search(self.ads_selector): ids = [int(n) for n in self.ads_selector.split(',')] use_specific_ads = True LOG.info('Start fetch task for the ad(s) with id(s):') LOG.info(' | '.join([str(id_) for id_ in ids])) ad_fields = dicts.load_dict_from_module(resources, "ad_fields.yaml") ads = [] for ad_file, ad_file_relative in sorted(ad_files.items()): ad_cfg_orig = dicts.load_dict(ad_file, "ad") ad_cfg = copy.deepcopy(ad_cfg_orig) dicts.apply_defaults(ad_cfg, self.config["ad_defaults"], ignore = lambda k, _: k == "description", override = lambda _, v: v == "") dicts.apply_defaults(ad_cfg, ad_fields) if ignore_inactive and not ad_cfg["active"]: LOG.info(" -> SKIPPED: inactive ad [%s]", ad_file_relative) continue if use_specific_ads: if ad_cfg["id"] not in ids: LOG.info(" -> SKIPPED: ad [%s] is not in list of given ids.", ad_file_relative) continue else: if self.ads_selector == "new" and ad_cfg["id"] and check_id: LOG.info(" -> SKIPPED: ad [%s] is not new. already has an id assigned.", ad_file_relative) continue if self.ads_selector == "due": if not self.__check_ad_republication(ad_cfg, ad_cfg_orig, ad_file_relative): continue # Get description with prefix/suffix from ad config if present, otherwise use defaults ad_cfg["description"] = self.__get_description_with_affixes(ad_cfg) # Validate total length ensure(len(ad_cfg["description"]) <= 4000, f"""Length of ad description including prefix and suffix exceeds 4000 chars. Description length: { len(ad_cfg["description"])} chars. @ {ad_file}.""") # pylint: disable=cell-var-from-loop def assert_one_of(path:str, allowed:Iterable[str]) -> None: ensure(dicts.safe_get(ad_cfg, *path.split(".")) in allowed, f"-> property [{path}] must be one of: {allowed} @ [{ad_file}]") def assert_min_len(path:str, minlen:int) -> None: ensure(len(dicts.safe_get(ad_cfg, *path.split("."))) >= minlen, f"-> property [{path}] must be at least {minlen} characters long @ [{ad_file}]") def assert_has_value(path:str) -> None: ensure(dicts.safe_get(ad_cfg, *path.split(".")), f"-> property [{path}] not specified @ [{ad_file}]") # pylint: enable=cell-var-from-loop assert_one_of("type", {"OFFER", "WANTED"}) assert_min_len("title", 10) assert_has_value("description") assert_one_of("price_type", {"FIXED", "NEGOTIABLE", "GIVE_AWAY", "NOT_APPLICABLE"}) if ad_cfg["price_type"] == "GIVE_AWAY": ensure(not dicts.safe_get(ad_cfg, "price"), f"-> [price] must not be specified for GIVE_AWAY ad @ [{ad_file}]") elif ad_cfg["price_type"] == "FIXED": assert_has_value("price") assert_one_of("shipping_type", {"PICKUP", "SHIPPING", "NOT_APPLICABLE"}) assert_has_value("contact.name") assert_has_value("republication_interval") if ad_cfg["id"]: ad_cfg["id"] = int(ad_cfg["id"]) if ad_cfg["category"]: resolved_category_id = self.categories.get(ad_cfg["category"]) if not resolved_category_id and ">" in ad_cfg["category"]: # this maps actually to the sonstiges/weiteres sub-category parent_category = ad_cfg["category"].rpartition(">")[0].strip() resolved_category_id = self.categories.get(parent_category) if resolved_category_id: LOG.warning( "Category [%s] unknown. Using category [%s] with ID [%s] instead.", ad_cfg["category"], parent_category, resolved_category_id) if resolved_category_id: ad_cfg["category"] = resolved_category_id if ad_cfg["shipping_costs"]: ad_cfg["shipping_costs"] = str(round(misc.parse_decimal(ad_cfg["shipping_costs"]), 2)) if ad_cfg["images"]: images = [] ad_dir = os.path.dirname(ad_file) for image_pattern in ad_cfg["images"]: pattern_images = set() for image_file in glob.glob(image_pattern, root_dir = ad_dir, flags = glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB): _, image_file_ext = os.path.splitext(image_file) ensure(image_file_ext.lower() in {".gif", ".jpg", ".jpeg", ".png"}, f"Unsupported image file type [{image_file}]") if os.path.isabs(image_file): pattern_images.add(image_file) else: pattern_images.add(abspath(image_file, relative_to = ad_file)) images.extend(sorted(pattern_images)) ensure(images or not ad_cfg["images"], f"No images found for given file patterns {ad_cfg['images']} at {ad_dir}") ad_cfg["images"] = list(dict.fromkeys(images)) ads.append(( ad_file, ad_cfg, ad_cfg_orig )) LOG.info("Loaded %s", pluralize("ad", ads)) return ads def load_config(self) -> None: config_defaults = dicts.load_dict_from_module(resources, "config_defaults.yaml") config = dicts.load_dict_if_exists(self.config_file_path, _("config")) if config is None: LOG.warning("Config file %s does not exist. Creating it with default values...", self.config_file_path) dicts.save_dict(self.config_file_path, config_defaults) config = {} self.config = dicts.apply_defaults(config, config_defaults) self.categories = dicts.load_dict_from_module(resources, "categories.yaml", "categories") deprecated_categories = dicts.load_dict_from_module(resources, "categories_old.yaml", "categories") self.categories.update(deprecated_categories) if self.config["categories"]: self.categories.update(self.config["categories"]) LOG.info(" -> found %s", pluralize("category", self.categories)) ensure(self.config["login"]["username"], f"[login.username] not specified @ [{self.config_file_path}]") ensure(self.config["login"]["password"], f"[login.password] not specified @ [{self.config_file_path}]") self.browser_config.arguments = self.config["browser"]["arguments"] self.browser_config.binary_location = self.config["browser"]["binary_location"] self.browser_config.extensions = [abspath(item, relative_to = self.config_file_path) for item in self.config["browser"]["extensions"]] self.browser_config.use_private_window = self.config["browser"]["use_private_window"] if self.config["browser"]["user_data_dir"]: self.browser_config.user_data_dir = abspath(self.config["browser"]["user_data_dir"], relative_to = self.config_file_path) self.browser_config.profile_name = self.config["browser"]["profile_name"] async def login(self) -> None: LOG.info("Checking if already logged in...") await self.web_open(f"{self.root_url}") if await self.is_logged_in(): LOG.info("Already logged in as [%s]. Skipping login.", self.config["login"]["username"]) return LOG.info("Opening login page...") await self.web_open(f"{self.root_url}/m-einloggen.html?targetUrl=/") try: await self.web_find(By.CSS_SELECTOR, "iframe[src*='captcha-delivery.com']", timeout = 2) LOG.warning("############################################") LOG.warning("# Captcha present! Please solve the captcha.") LOG.warning("############################################") await self.web_await(lambda: self.web_find(By.ID, "login-form") is not None, timeout = 5 * 60) except TimeoutError: pass await self.fill_login_data_and_send() await self.handle_after_login_logic() # Sometimes a second login is required if not await self.is_logged_in(): await self.fill_login_data_and_send() await self.handle_after_login_logic() async def fill_login_data_and_send(self) -> None: LOG.info("Logging in as [%s]...", self.config["login"]["username"]) await self.web_input(By.ID, "email", self.config["login"]["username"]) await self.web_input(By.ID, "password", self.config["login"]["password"]) await self.web_click(By.CSS_SELECTOR, "form#login-form button[type='submit']") async def handle_after_login_logic(self) -> None: try: await self.web_find(By.TEXT, "Wir haben dir gerade einen 6-stelligen Code für die Telefonnummer", timeout = 4) LOG.warning("############################################") LOG.warning("# Device verification message detected. Please follow the instruction displayed in the Browser.") LOG.warning("############################################") await ainput("Press ENTER when done...") except TimeoutError: pass try: LOG.info("Handling GDPR disclaimer...") await self.web_find(By.ID, "gdpr-banner-accept", timeout = 10) await self.web_click(By.ID, "gdpr-banner-cmp-button") await self.web_click(By.CSS_SELECTOR, "#ConsentManagementPage button.Button-secondary", timeout = 10) except TimeoutError: pass async def is_logged_in(self) -> bool: try: user_info = await self.web_text(By.ID, "user-email") if self.config['login']['username'].lower() in user_info.lower(): return True except TimeoutError: return False return False async def delete_ads(self, ad_cfgs:list[tuple[str, dict[str, Any], dict[str, Any]]]) -> None: count = 0 published_ads = json.loads( (await self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT"))["content"])["ads"] for (ad_file, ad_cfg, _) in ad_cfgs: count += 1 LOG.info("Processing %s/%s: '%s' from [%s]...", count, len(ad_cfgs), ad_cfg["title"], ad_file) await self.delete_ad(ad_cfg, self.config["publishing"]["delete_old_ads_by_title"], published_ads) await self.web_sleep() LOG.info("############################################") LOG.info("DONE: Deleted %s", pluralize("ad", count)) LOG.info("############################################") async def delete_ad(self, ad_cfg: dict[str, Any], delete_old_ads_by_title: bool, published_ads: list[dict[str, Any]]) -> bool: LOG.info("Deleting ad '%s' if already present...", ad_cfg["title"]) await self.web_open(f"{self.root_url}/m-meine-anzeigen.html") csrf_token_elem = await self.web_find(By.CSS_SELECTOR, "meta[name=_csrf]") csrf_token = csrf_token_elem.attrs["content"] ensure(csrf_token is not None, "Expected CSRF Token not found in HTML content!") if delete_old_ads_by_title: for published_ad in published_ads: published_ad_id = int(published_ad.get("id", -1)) published_ad_title = published_ad.get("title", "") if ad_cfg["id"] == published_ad_id or ad_cfg["title"] == published_ad_title: LOG.info(" -> deleting %s '%s'...", published_ad_id, published_ad_title) await self.web_request( url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={published_ad_id}", method = "POST", headers = {"x-csrf-token": csrf_token} ) elif ad_cfg["id"]: await self.web_request( url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={ad_cfg['id']}", method = "POST", headers = {"x-csrf-token": csrf_token}, valid_response_codes = [200, 404] ) await self.web_sleep() ad_cfg["id"] = None return True async def publish_ads(self, ad_cfgs:list[tuple[str, dict[str, Any], dict[str, Any]]]) -> None: count = 0 published_ads = json.loads( (await self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT"))["content"])["ads"] for (ad_file, ad_cfg, ad_cfg_orig) in ad_cfgs: LOG.info("Processing %s/%s: '%s' from [%s]...", count + 1, len(ad_cfgs), ad_cfg["title"], ad_file) if [x for x in published_ads if x["id"] == ad_cfg["id"] and x["state"] == "paused"]: LOG.info("Skipping because ad is reserved") continue count += 1 await self.publish_ad(ad_file, ad_cfg, ad_cfg_orig, published_ads) await self.web_await(lambda: self.web_check(By.ID, "checking-done", Is.DISPLAYED), timeout = 5 * 60) if self.config["publishing"]["delete_old_ads"] == "AFTER_PUBLISH" and not self.keep_old_ads: await self.delete_ad(ad_cfg, False, published_ads) LOG.info("############################################") LOG.info("DONE: (Re-)published %s", pluralize("ad", count)) LOG.info("############################################") async def publish_ad(self, ad_file:str, ad_cfg: dict[str, Any], ad_cfg_orig: dict[str, Any], published_ads: list[dict[str, Any]]) -> None: """ @param ad_cfg: the effective ad config (i.e. with default values applied etc.) @param ad_cfg_orig: the ad config as present in the YAML file @param published_ads: json list of published ads """ await self.assert_free_ad_limit_not_reached() if self.config["publishing"]["delete_old_ads"] == "BEFORE_PUBLISH" and not self.keep_old_ads: await self.delete_ad(ad_cfg, self.config["publishing"]["delete_old_ads_by_title"], published_ads) LOG.info("Publishing ad '%s'...", ad_cfg["title"]) if loggers.is_debug(LOG): LOG.debug(" -> effective ad meta:") YAML().dump(ad_cfg, sys.stdout) await self.web_open(f"{self.root_url}/p-anzeige-aufgeben-schritt2.html") if ad_cfg["type"] == "WANTED": await self.web_click(By.ID, "adType2") ############################# # set title ############################# await self.web_input(By.ID, "postad-title", ad_cfg["title"]) ############################# # set category ############################# await self.__set_category(ad_cfg['category'], ad_file) ############################# # set special attributes ############################# await self.__set_special_attributes(ad_cfg) ############################# # set shipping type/options/costs ############################# if ad_cfg["type"] == "WANTED": # special handling for ads of type WANTED since shipping is a special attribute for these if ad_cfg["shipping_type"] in {"PICKUP", "SHIPPING"}: shipping_value = "ja" if ad_cfg["shipping_type"] == "SHIPPING" else "nein" try: await self.web_select(By.XPATH, "//select[contains(@id, '.versand_s')]", shipping_value) except TimeoutError: LOG.warning("Failed to set shipping attribute for type '%s'!", ad_cfg['shipping_type']) else: await self.__set_shipping(ad_cfg) ############################# # set price ############################# price_type = ad_cfg["price_type"] if price_type != "NOT_APPLICABLE": try: await self.web_select(By.CSS_SELECTOR, "select#price-type-react, select#micro-frontend-price-type, select#priceType", price_type) except TimeoutError: pass if dicts.safe_get(ad_cfg, "price"): await self.web_input(By.CSS_SELECTOR, "input#post-ad-frontend-price, input#micro-frontend-price, input#pstad-price", ad_cfg["price"]) ############################# # set sell_directly ############################# sell_directly = ad_cfg["sell_directly"] try: if ad_cfg["shipping_type"] == "SHIPPING": if sell_directly and ad_cfg["shipping_options"] and price_type in {"FIXED", "NEGOTIABLE"}: if not await self.web_check(By.ID, "radio-buy-now-yes", Is.SELECTED): await self.web_click(By.ID, 'radio-buy-now-yes') elif not await self.web_check(By.ID, "radio-buy-now-no", Is.SELECTED): await self.web_click(By.ID, 'radio-buy-now-no') except TimeoutError as ex: LOG.debug(ex, exc_info = True) ############################# # set description ############################# description = self.__get_description_with_affixes(ad_cfg) await self.web_execute("document.querySelector('#pstad-descrptn').value = `" + description.replace("`", "'") + "`") ############################# # set contact zipcode ############################# if ad_cfg["contact"]["zipcode"]: await self.web_input(By.ID, "pstad-zip", ad_cfg["contact"]["zipcode"]) # Set city if location is specified if ad_cfg["contact"].get("location"): try: await self.web_sleep(1) # Wait for city dropdown to populate options = await self.web_find_all(By.CSS_SELECTOR, "#pstad-citychsr option") for option in options: option_text = await self.web_text(By.CSS_SELECTOR, "option", parent = option) if option_text == ad_cfg["contact"]["location"]: await self.web_select(By.ID, "pstad-citychsr", option_text) break except TimeoutError: LOG.debug("Could not set city from location") ############################# # set contact street ############################# if ad_cfg["contact"]["street"]: try: if await self.web_check(By.ID, "pstad-street", Is.DISABLED): await self.web_click(By.ID, "addressVisibility") await self.web_sleep() except TimeoutError: # ignore pass await self.web_input(By.ID, "pstad-street", ad_cfg["contact"]["street"]) ############################# # set contact name ############################# if ad_cfg["contact"]["name"] and not await self.web_check(By.ID, "postad-contactname", Is.READONLY): await self.web_input(By.ID, "postad-contactname", ad_cfg["contact"]["name"]) ############################# # set contact phone ############################# if ad_cfg["contact"]["phone"]: if await self.web_check(By.ID, "postad-phonenumber", Is.DISPLAYED): try: if await self.web_check(By.ID, "postad-phonenumber", Is.DISABLED): await self.web_click(By.ID, "phoneNumberVisibility") await self.web_sleep() except TimeoutError: # ignore pass await self.web_input(By.ID, "postad-phonenumber", ad_cfg["contact"]["phone"]) ############################# # upload images ############################# await self.__upload_images(ad_cfg) ############################# # wait for captcha ############################# try: await self.web_find(By.CSS_SELECTOR, "iframe[name^='a-'][src^='https://www.google.com/recaptcha/api2/anchor?']", timeout = 2) LOG.warning("############################################") LOG.warning("# Captcha present! Please solve the captcha.") LOG.warning("############################################") await self.web_scroll_page_down() input(_("Press a key to continue...")) except TimeoutError: pass ############################# # submit ############################# try: await self.web_click(By.ID, "pstad-submit") except TimeoutError: # https://github.com/Second-Hand-Friends/kleinanzeigen-bot/issues/40 await self.web_click(By.XPATH, "//fieldset[@id='postad-publish']//*[contains(text(),'Anzeige aufgeben')]") await self.web_click(By.ID, "imprint-guidance-submit") # check for no image question try: image_hint_xpath = '//*[contains(@class, "ModalDialog--Actions")]//button[.//*[text()[contains(.,"Ohne Bild veröffentlichen")]]]' if not ad_cfg["images"] and await self.web_check(By.XPATH, image_hint_xpath, Is.DISPLAYED): await self.web_click(By.XPATH, image_hint_xpath) except TimeoutError: pass # nosec await self.web_await(lambda: "p-anzeige-aufgeben-bestaetigung.html?adId=" in self.page.url, timeout = 20) # extract the ad id from the URL's query parameter current_url_query_params = urllib_parse.parse_qs(urllib_parse.urlparse(self.page.url).query) ad_id = int(current_url_query_params.get("adId", [])[0]) ad_cfg_orig["id"] = ad_id # check for approval message try: approval_link_xpath = '//*[contains(@id, "not-completed")]//*//a[contains(@class, "to-my-ads-link")]' if await self.web_check(By.XPATH, approval_link_xpath, Is.DISPLAYED): await self.web_click(By.XPATH, approval_link_xpath) except TimeoutError: pass # nosec # Update content hash after successful publication # Calculate hash on original config to ensure consistent comparison on restart ad_cfg_orig["content_hash"] = calculate_content_hash(ad_cfg_orig) ad_cfg_orig["updated_on"] = datetime.utcnow().isoformat() if not ad_cfg["created_on"] and not ad_cfg["id"]: ad_cfg_orig["created_on"] = ad_cfg_orig["updated_on"] LOG.info(" -> SUCCESS: ad published with ID %s", ad_id) dicts.save_dict(ad_file, ad_cfg_orig) async def __set_condition(self, condition_value: str) -> None: condition_mapping = { "new_with_tag": "Neu mit Etikett", "new": "Neu", "like_new": "Sehr Gut", "alright": "Gut", "ok": "In Ordnung", } mapped_condition = condition_mapping.get(condition_value) try: # Open condition dialog await self.web_click(By.CSS_SELECTOR, '[class*="ConditionSelector"] button') except TimeoutError: LOG.debug("Unable to open condition dialog and select condition [%s]", condition_value, exc_info = True) return try: # Click radio button await self.web_click(By.CSS_SELECTOR, f'.SingleSelectionItem--Main input[type=radio][data-testid="{mapped_condition}"]') except TimeoutError: LOG.debug("Unable to select condition [%s]", condition_value, exc_info = True) try: # Click continue button await self.web_click(By.XPATH, '//*[contains(@class, "ModalDialog--Actions")]//button[.//*[text()[contains(.,"Bestätigen")]]]') except TimeoutError as ex: raise TimeoutError(_("Unable to close condition dialog!")) from ex async def __set_category(self, category: str | None, ad_file:str) -> None: # click on something to trigger automatic category detection await self.web_click(By.ID, "pstad-descrptn") is_category_auto_selected = False try: if await self.web_text(By.ID, "postad-category-path"): is_category_auto_selected = True except TimeoutError: pass if category: await self.web_sleep() # workaround for https://github.com/Second-Hand-Friends/kleinanzeigen-bot/issues/39 await self.web_click(By.ID, "pstad-lnk-chngeCtgry") await self.web_find(By.ID, "postad-step1-sbmt") category_url = f"{self.root_url}/p-kategorie-aendern.html#?path={category}" await self.web_open(category_url) await self.web_click(By.XPATH, "//*[@id='postad-step1-sbmt']/button") else: ensure(is_category_auto_selected, f"No category specified in [{ad_file}] and automatic category detection failed") async def __set_special_attributes(self, ad_cfg: dict[str, Any]) -> None: if ad_cfg["special_attributes"]: LOG.debug('Found %i special attributes', len(ad_cfg["special_attributes"])) for special_attribute_key, special_attribute_value in ad_cfg["special_attributes"].items(): if special_attribute_key == "condition_s": await self.__set_condition(special_attribute_value) continue LOG.debug("Setting special attribute [%s] to [%s]...", special_attribute_key, special_attribute_value) try: # if the