""" SPDX-FileCopyrightText: © Sebastian Thomschke and contributors SPDX-License-Identifier: AGPL-3.0-or-later SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ """ import atexit, copy, getopt, importlib.metadata, json, logging, os, re, signal, shutil, sys, textwrap, time, urllib from collections.abc import Iterable from datetime import datetime from logging.handlers import RotatingFileHandler from typing import Any, Final import certifi, colorama from overrides import overrides from ruamel.yaml import YAML from selenium.common.exceptions import ElementClickInterceptedException, NoSuchElementException, TimeoutException, WebDriverException from selenium.webdriver.common.by import By from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.support import expected_conditions as EC from wcmatch import glob from . import utils, resources, extract # pylint: disable=W0406 from .utils import abspath, apply_defaults, ensure, is_frozen, pause, pluralize, safe_get, parse_datetime from .selenium_mixin import SeleniumMixin from ._version import __version__ # W0406: possibly a bug, see https://github.com/PyCQA/pylint/issues/3933 LOG_ROOT:Final[logging.Logger] = logging.getLogger() LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot") LOG.setLevel(logging.INFO) colorama.init() class KleinanzeigenBot(SeleniumMixin): def __init__(self) -> None: # workaround for https://github.com/Second-Hand-Friends/kleinanzeigen-bot/issues/207 # see https://github.com/pyinstaller/pyinstaller/issues/7229#issuecomment-1309383026 os.environ["SSL_CERT_FILE"] = certifi.where() super().__init__() self.root_url = "https://www.kleinanzeigen.de" self.config:dict[str, Any] = {} self.config_file_path = abspath("config.yaml") self.categories:dict[str, str] = {} self.file_log:logging.FileHandler | None = None log_file_basename = is_frozen() and os.path.splitext(os.path.basename(sys.executable))[0] or self.__module__ self.log_file_path:str | None = abspath(f"{log_file_basename}.log") self.command = "help" self.ads_selector = "due" self.delete_old_ads = True self.delete_ads_by_title = False def __del__(self) -> None: if self.file_log: LOG_ROOT.removeHandler(self.file_log) if self.webdriver: self.webdriver.quit() self.webdriver = None def get_version(self) -> str: return __version__ def run(self, args:list[str]) -> None: self.parse_args(args) match self.command: case "help": self.show_help() case "version": print(self.get_version()) case "verify": self.configure_file_logging() self.load_config() self.load_ads() LOG.info("############################################") LOG.info("DONE: No configuration errors found.") LOG.info("############################################") case "publish": self.configure_file_logging() self.load_config() if not (self.ads_selector in {'all', 'new', 'due'} or re.compile(r'\d+[,\d+]*').search(self.ads_selector)): LOG.warning('You provided no ads selector. Defaulting to "due".') self.ads_selector = 'due' if ads := self.load_ads(): self.create_webdriver_session() self.login() self.publish_ads(ads) else: LOG.info("############################################") LOG.info("DONE: No new/outdated ads found.") LOG.info("############################################") case "delete": self.configure_file_logging() self.load_config() if ads := self.load_ads(): self.create_webdriver_session() self.login() self.delete_ads(ads) else: LOG.info("############################################") LOG.info("DONE: No ads to delete found.") LOG.info("############################################") case "download": self.configure_file_logging() # ad IDs depends on selector if not (self.ads_selector in {'all', 'new'} or re.compile(r'\d+[,\d+]*').search(self.ads_selector)): LOG.warning('You provided no ads selector. Defaulting to "new".') self.ads_selector = 'new' # start session self.load_config() self.create_webdriver_session() self.login() self.start_download_routine() # call correct version of download case _: LOG.error("Unknown command: %s", self.command) sys.exit(2) def show_help(self) -> None: if is_frozen(): exe = sys.argv[0] elif os.getenv("PDM_PROJECT_ROOT", ""): exe = "pdm run app" else: exe = "python -m kleinanzeigen_bot" print(textwrap.dedent(f"""\ Usage: {colorama.Fore.LIGHTMAGENTA_EX}{exe} COMMAND [OPTIONS]{colorama.Style.RESET_ALL} Commands: publish - (re-)publishes ads verify - verifies the configuration files delete - deletes ads download - downloads one or multiple ads -- help - displays this help (default command) version - displays the application version Options: --ads=all|due|new| (publish) - specifies which ads to (re-)publish (DEFAULT: due) Possible values: * all: (re-)publish all ads ignoring republication_interval * due: publish all new ads and republish ads according the republication_interval * new: only publish new ads (i.e. ads that have no id in the config file) * : provide one or several ads by ID to (re-)publish, like e.g. "--ads=1,2,3" ignoring republication_interval --ads=all|new| (download) - specifies which ads to download (DEFAULT: new) Possible values: * all: downloads all ads from your profile * new: downloads ads from your profile that are not locally saved yet * : provide one or several ads by ID to download, like e.g. "--ads=1,2,3" --force - alias for '--ads=all' --keep-old - don't delete old ads on republication --config= - path to the config YAML or JSON file (DEFAULT: ./config.yaml) --logfile= - path to the logfile (DEFAULT: ./kleinanzeigen-bot.log) -v, --verbose - enables verbose output - only useful when troubleshooting issues """)) def parse_args(self, args:list[str]) -> None: try: options, arguments = getopt.gnu_getopt(args[1:], "hv", [ "ads=", "config=", "force", "help", "keep-old", "logfile=", "verbose" ]) except getopt.error as ex: LOG.error(ex.msg) LOG.error("Use --help to display available options") sys.exit(2) for option, value in options: match option: case "-h" | "--help": self.show_help() sys.exit(0) case "--config": self.config_file_path = abspath(value) case "--logfile": if value: self.log_file_path = abspath(value) else: self.log_file_path = None case "--ads": self.ads_selector = value.strip().lower() case "--force": self.ads_selector = "all" case "--keep-old": self.delete_old_ads = False case "-v" | "--verbose": LOG.setLevel(logging.DEBUG) match len(arguments): case 0: self.command = "help" case 1: self.command = arguments[0] case _: LOG.error("More than one command given: %s", arguments) sys.exit(2) def configure_file_logging(self) -> None: if not self.log_file_path: return if self.file_log: return LOG.info("Logging to [%s]...", self.log_file_path) self.file_log = RotatingFileHandler(filename = self.log_file_path, maxBytes = 10 * 1024 * 1024, backupCount = 10, encoding = "utf-8") self.file_log.setLevel(logging.DEBUG) self.file_log.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) LOG_ROOT.addHandler(self.file_log) LOG.info("App version: %s", self.get_version()) LOG.info("Python version: %s", sys.version) def load_ads(self, *, ignore_inactive:bool = True, check_id:bool = True) -> list[tuple[str, dict[str, Any], dict[str, Any]]]: LOG.info("Searching for ad config files...") ad_files = set() data_root_dir = os.path.dirname(self.config_file_path) for file_pattern in self.config["ad_files"]: for ad_file in glob.glob(file_pattern, root_dir = data_root_dir, flags = glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB): if not str(ad_file).endswith('ad_fields.yaml'): ad_files.add(abspath(ad_file, relative_to = data_root_dir)) LOG.info(" -> found %s", pluralize("ad config file", ad_files)) if not ad_files: return [] descr_prefix = self.config["ad_defaults"]["description"]["prefix"] or "" descr_suffix = self.config["ad_defaults"]["description"]["suffix"] or "" ids = [] use_specific_ads = False if re.compile(r'\d+[,\d+]*').search(self.ads_selector): ids = [int(n) for n in self.ads_selector.split(',')] use_specific_ads = True LOG.info('Start fetch task for the ad(s) with the id(s):') LOG.info(' | '.join([str(id_) for id_ in ids])) ad_fields = utils.load_dict_from_module(resources, "ad_fields.yaml") ads = [] for ad_file in sorted(ad_files): ad_cfg_orig = utils.load_dict(ad_file, "ad") ad_cfg = copy.deepcopy(ad_cfg_orig) apply_defaults(ad_cfg, self.config["ad_defaults"], ignore = lambda k, _: k == "description", override = lambda _, v: v == "") apply_defaults(ad_cfg, ad_fields) if ignore_inactive and not ad_cfg["active"]: LOG.info(" -> SKIPPED: inactive ad [%s]", ad_file) continue if use_specific_ads: if not ad_cfg["id"] in ids: LOG.info(" -> SKIPPED: ad [%s] is not in list of given ids.", ad_file) continue else: if self.ads_selector == "new" and ad_cfg["id"] and check_id: LOG.info(" -> SKIPPED: ad [%s] is not new. already has an id assigned.", ad_file) continue if self.ads_selector == "due": if ad_cfg["updated_on"]: last_updated_on = parse_datetime(ad_cfg["updated_on"]) elif ad_cfg["created_on"]: last_updated_on = parse_datetime(ad_cfg["created_on"]) else: last_updated_on = None if last_updated_on: ad_age = datetime.utcnow() - last_updated_on if ad_age.days <= ad_cfg["republication_interval"]: LOG.info(" -> SKIPPED: ad [%s] was last published %d days ago. republication is only required every %s days", ad_file, ad_age.days, ad_cfg["republication_interval"] ) continue ad_cfg["description"] = descr_prefix + (ad_cfg["description"] or "") + descr_suffix ensure(len(ad_cfg["description"]) <= 4000, f"Length of ad description including prefix and suffix exceeds 4000 chars. @ [{ad_file}]") # pylint: disable=cell-var-from-loop def assert_one_of(path:str, allowed:Iterable[str]) -> None: ensure(safe_get(ad_cfg, *path.split(".")) in allowed, f"-> property [{path}] must be one of: {allowed} @ [{ad_file}]") def assert_min_len(path:str, minlen:int) -> None: ensure(len(safe_get(ad_cfg, *path.split("."))) >= minlen, f"-> property [{path}] must be at least {minlen} characters long @ [{ad_file}]") def assert_has_value(path:str) -> None: ensure(safe_get(ad_cfg, *path.split(".")), f"-> property [{path}] not specified @ [{ad_file}]") # pylint: enable=cell-var-from-loop assert_one_of("type", {"OFFER", "WANTED"}) assert_min_len("title", 10) assert_has_value("description") assert_one_of("price_type", {"FIXED", "NEGOTIABLE", "GIVE_AWAY", "NOT_APPLICABLE"}) if ad_cfg["price_type"] == "GIVE_AWAY": ensure(not safe_get(ad_cfg, "price"), f"-> [price] must not be specified for GIVE_AWAY ad @ [{ad_file}]") elif ad_cfg["price_type"] == "FIXED": assert_has_value("price") assert_one_of("shipping_type", {"PICKUP", "SHIPPING", "NOT_APPLICABLE"}) assert_has_value("contact.name") assert_has_value("republication_interval") if ad_cfg["id"]: ad_cfg["id"] = int(ad_cfg["id"]) if ad_cfg["category"]: ad_cfg["category"] = self.categories.get(ad_cfg["category"], ad_cfg["category"]) if ad_cfg["shipping_costs"]: ad_cfg["shipping_costs"] = str(round(utils.parse_decimal(ad_cfg["shipping_costs"]), 2)) if ad_cfg["images"]: images = [] for image_pattern in ad_cfg["images"]: pattern_images = set() ad_dir = os.path.dirname(ad_file) for image_file in glob.glob(image_pattern, root_dir = ad_dir, flags = glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB): _, image_file_ext = os.path.splitext(image_file) ensure(image_file_ext.lower() in {".gif", ".jpg", ".jpeg", ".png"}, f"Unsupported image file type [{image_file}]") if os.path.isabs(image_file): pattern_images.add(image_file) else: pattern_images.add(abspath(image_file, relative_to = ad_file)) images.extend(sorted(pattern_images)) ensure(images or not ad_cfg["images"], f"No images found for given file patterns {ad_cfg['images']} at {ad_dir}") ad_cfg["images"] = list(dict.fromkeys(images)) ads.append(( ad_file, ad_cfg, ad_cfg_orig )) LOG.info("Loaded %s", pluralize("ad", ads)) return ads def load_config(self) -> None: config_defaults = utils.load_dict_from_module(resources, "config_defaults.yaml") config = utils.load_dict_if_exists(self.config_file_path, "config") if config is None: LOG.warning("Config file %s does not exist. Creating it with default values...", self.config_file_path) utils.save_dict(self.config_file_path, config_defaults) config = {} self.config = apply_defaults(config, config_defaults) self.categories = utils.load_dict_from_module(resources, "categories.yaml", "categories") if self.config["categories"]: self.categories.update(self.config["categories"]) LOG.info(" -> found %s", pluralize("category", self.categories)) ensure(self.config["login"]["username"], f"[login.username] not specified @ [{self.config_file_path}]") ensure(self.config["login"]["password"], f"[login.password] not specified @ [{self.config_file_path}]") self.browser_config.arguments = self.config["browser"]["arguments"] self.browser_config.binary_location = self.config["browser"]["binary_location"] self.browser_config.extensions = [abspath(item, relative_to = self.config_file_path) for item in self.config["browser"]["extensions"]] self.browser_config.use_private_window = self.config["browser"]["use_private_window"] if self.config["browser"]["user_data_dir"]: self.browser_config.user_data_dir = abspath(self.config["browser"]["user_data_dir"], relative_to = self.config_file_path) self.browser_config.profile_name = self.config["browser"]["profile_name"] def login(self) -> None: LOG.info("Checking if already logged in") self.web_open(f"{self.root_url}") if self.is_logged_in(): LOG.info("Already logged in as [%s]. Skipping login.", self.config["login"]["username"]) return LOG.info("Logging in as [%s]...", self.config["login"]["username"]) self.web_open(f"{self.root_url}/m-einloggen.html?targetUrl=/") # close redesign banner try: self.web_click(By.XPATH, '//*[@id="pre-launch-comms-interstitial-frontend"]//button[.//*[text()[contains(.,"nicht mehr anzeigen")]]]') except NoSuchElementException: pass # accept privacy banner try: self.web_click(By.ID, "gdpr-banner-accept") except NoSuchElementException: pass self.web_input(By.ID, "login-email", self.config["login"]["username"]) self.web_input(By.ID, "login-password", self.config["login"]["password"]) self.handle_captcha_if_present("login-recaptcha", "but DON'T click 'Einloggen'.") self.web_click(By.ID, "login-submit") try: self.web_find(By.ID, "new-device-login", 4) LOG.warning("############################################") LOG.warning("# Device verification message detected. Use the 'Login bestätigen' URL from the mentioned e-mail into the same browser tab.") LOG.warning("############################################") input("Press ENTER when done...") except NoSuchElementException: pass def is_logged_in(self) -> bool: try: user_email_elem = self.web_find(By.ID, "user-email") email_text = user_email_elem.text if f"angemeldet als: {self.config['login']['username']}" == email_text: return True except NoSuchElementException: return False return False def handle_captcha_if_present(self, captcha_element_id:str, msg:str) -> None: try: self.web_click(By.XPATH, f"//*[@id='{captcha_element_id}']") except NoSuchElementException: return LOG.warning("############################################") LOG.warning("# Captcha present! Please solve and close the captcha, %s", msg) LOG.warning("############################################") self.webdriver.switch_to.frame(self.web_find(By.CSS_SELECTOR, f"#{captcha_element_id} iframe")) self.web_await(lambda _: self.webdriver.find_element(By.ID, "recaptcha-anchor").get_attribute("aria-checked") == "true", timeout = 5 * 60) self.webdriver.switch_to.default_content() def delete_ads(self, ad_cfgs:list[tuple[str, dict[str, Any], dict[str, Any]]]) -> None: count = 0 for (ad_file, ad_cfg, _) in ad_cfgs: count += 1 LOG.info("Processing %s/%s: '%s' from [%s]...", count, len(ad_cfgs), ad_cfg["title"], ad_file) self.delete_ad(ad_cfg) pause(2000, 4000) LOG.info("############################################") LOG.info("DONE: Deleting %s", pluralize("ad", count)) LOG.info("############################################") def delete_ad(self, ad_cfg: dict[str, Any]) -> bool: LOG.info("Deleting ad '%s' if already present...", ad_cfg["title"]) self.web_open(f"{self.root_url}/m-meine-anzeigen.html") csrf_token_elem = self.web_find(By.XPATH, "//meta[@name='_csrf']") csrf_token = csrf_token_elem.get_attribute("content") if csrf_token is None: raise AssertionError("Expected CSRF Token not found in HTML content!") if self.delete_ads_by_title: published_ads = json.loads(self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT")["content"])["ads"] for published_ad in published_ads: published_ad_id = int(published_ad.get("id", -1)) published_ad_title = published_ad.get("title", "") if ad_cfg["id"] == published_ad_id or ad_cfg["title"] == published_ad_title: LOG.info(" -> deleting %s '%s'...", published_ad_id, published_ad_title) self.web_request( url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={published_ad_id}", method = "POST", headers = {"x-csrf-token": csrf_token} ) elif ad_cfg["id"]: self.web_request( url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={ad_cfg['id']}", method = "POST", headers = {"x-csrf-token": csrf_token}, valid_response_codes = [200, 404] ) pause(1500, 3000) ad_cfg["id"] = None return True def publish_ads(self, ad_cfgs:list[tuple[str, dict[str, Any], dict[str, Any]]]) -> None: count = 0 for (ad_file, ad_cfg, ad_cfg_orig) in ad_cfgs: count += 1 LOG.info("Processing %s/%s: '%s' from [%s]...", count, len(ad_cfgs), ad_cfg["title"], ad_file) self.publish_ad(ad_file, ad_cfg, ad_cfg_orig) self.web_await(lambda _: self.webdriver.find_element(By.ID, "checking-done").is_displayed(), timeout = 5 * 60) LOG.info("############################################") LOG.info("DONE: (Re-)published %s", pluralize("ad", count)) LOG.info("############################################") def publish_ad(self, ad_file:str, ad_cfg: dict[str, Any], ad_cfg_orig: dict[str, Any]) -> None: self.assert_free_ad_limit_not_reached() if self.delete_old_ads: self.delete_ad(ad_cfg) LOG.info("Publishing ad '%s'...", ad_cfg["title"]) if LOG.isEnabledFor(logging.DEBUG): LOG.debug(" -> effective ad meta:") YAML().dump(ad_cfg, sys.stdout) self.web_open(f"{self.root_url}/p-anzeige-aufgeben-schritt2.html") if ad_cfg["type"] == "WANTED": self.web_click(By.ID, "adType2") ############################# # set title ############################# self.web_input(By.ID, "postad-title", ad_cfg["title"]) ############################# # set category ############################# self.__set_category(ad_file, ad_cfg) ############################# # set shipping type/options/costs ############################# if ad_cfg["shipping_type"] == "PICKUP": try: self.web_click(By.XPATH, '//*[contains(@class, "ShippingPickupSelector")]//label[text()[contains(.,"Nur Abholung")]]/input[@type="radio"]') except NoSuchElementException as ex: LOG.debug(ex, exc_info = True) elif ad_cfg["shipping_options"]: self.web_click(By.XPATH, '//*[contains(@class, "jsx-2623555103")]') self.web_click(By.XPATH, '//*[contains(@class, "CarrierSelectionModal--Button")]') self.__set_shipping_options(ad_cfg) else: try: self.web_click(By.XPATH, '//*[contains(@class, "jsx-2623555103")]') self.web_click(By.XPATH, '//*[contains(@class, "CarrierSelectionModal--Button")]') self.web_click(By.XPATH, '//*[contains(@class, "CarrierOption--Main")]') if ad_cfg["shipping_costs"]: self.web_input(By.XPATH, '//*[contains(@class, "IndividualShippingInput")]//input[@type="text"]', str.replace(ad_cfg["shipping_costs"], ".", ",") ) self.web_click(By.XPATH, '//*[contains(@class, "ModalDialog--Actions")]//button[.//*[text()[contains(.,"Fertig")]]]') except NoSuchElementException as ex: LOG.debug(ex, exc_info = True) ############################# # set price ############################# price_type = ad_cfg["price_type"] if price_type != "NOT_APPLICABLE": try: self.web_select(By.XPATH, "//select[@id='price-type-react' or @id='micro-frontend-price-type' or @id='priceType']", price_type) except NoSuchElementException: pass if safe_get(ad_cfg, "price"): self.web_input(By.XPATH, "//input[@id='post-ad-frontend-price' or @id='micro-frontend-price' or @id='pstad-price']", ad_cfg["price"]) ############################# # set sell_directly ############################# sell_directly = ad_cfg["sell_directly"] try: if ad_cfg["shipping_type"] == "SHIPPING": if sell_directly and ad_cfg["shipping_options"] and price_type in {"FIXED", "NEGOTIABLE"}: if not self.webdriver.find_element(By.ID, "radio-buy-now-yes").is_selected(): self.web_click(By.XPATH, '//*[contains(@id, "radio-buy-now-yes")]') elif not self.webdriver.find_element(By.ID, "radio-buy-now-no").is_selected(): self.web_click(By.XPATH, '//*[contains(@id, "radio-buy-now-no")]') except NoSuchElementException as ex: LOG.debug(ex, exc_info = True) ############################# # set description ############################# self.web_execute("document.querySelector('#pstad-descrptn').value = `" + ad_cfg["description"].replace("`", "'") + "`") ############################# # set contact zipcode ############################# if ad_cfg["contact"]["zipcode"]: self.web_input(By.ID, "pstad-zip", ad_cfg["contact"]["zipcode"]) ############################# # set contact street ############################# if ad_cfg["contact"]["street"]: try: if not self.webdriver.find_element(By.ID, "pstad-street").is_enabled(): self.webdriver.find_element(By.ID, "addressVisibility").click() pause(2000) except NoSuchElementException: # ignore pass self.web_input(By.ID, "pstad-street", ad_cfg["contact"]["street"]) ############################# # set contact name ############################# if ad_cfg["contact"]["name"] and not self.webdriver.find_element(By.ID, "postad-contactname").get_attribute("readonly"): self.web_input(By.ID, "postad-contactname", ad_cfg["contact"]["name"]) ############################# # set contact phone ############################# if ad_cfg["contact"]["phone"]: if self.webdriver.find_element(By.ID, "postad-phonenumber").is_displayed(): try: if not self.webdriver.find_element(By.ID, "postad-phonenumber").is_enabled(): self.webdriver.find_element(By.ID, "phoneNumberVisibility").click() pause(2000) except NoSuchElementException: # ignore pass self.web_input(By.ID, "postad-phonenumber", ad_cfg["contact"]["phone"]) ############################# # upload images ############################# self.__upload_images(ad_cfg) ############################# # submit ############################# self.handle_captcha_if_present("postAd-recaptcha", "but DON'T click 'Anzeige aufgeben'.") try: self.web_click(By.ID, "pstad-submit") except NoSuchElementException: # https://github.com/Second-Hand-Friends/kleinanzeigen-bot/issues/40 self.web_click(By.XPATH, "//fieldset[@id='postad-publish']//*[contains(text(),'Anzeige aufgeben')]") self.web_click(By.ID, "imprint-guidance-submit") self.web_await(EC.url_contains("p-anzeige-aufgeben-bestaetigung.html?adId="), 20) ad_cfg_orig["updated_on"] = datetime.utcnow().isoformat() if not ad_cfg["created_on"] and not ad_cfg["id"]: ad_cfg_orig["created_on"] = ad_cfg_orig["updated_on"] # extract the ad id from the URL's query parameter current_url_query_params = urllib.parse.parse_qs(urllib.parse.urlparse(self.webdriver.current_url).query) ad_id = int(current_url_query_params.get("adId", [])[0]) ad_cfg_orig["id"] = ad_id LOG.info(" -> SUCCESS: ad published with ID %s", ad_id) utils.save_dict(ad_file, ad_cfg_orig) def __set_category(self, ad_file:str, ad_cfg: dict[str, Any]) -> None: # click on something to trigger automatic category detection self.web_click(By.ID, "pstad-descrptn") try: self.web_find(By.XPATH, "//*[@id='postad-category-path'][text()]") is_category_auto_selected = True except NoSuchElementException: is_category_auto_selected = False if ad_cfg["category"]: utils.pause(2000) # workaround for https://github.com/Second-Hand-Friends/kleinanzeigen-bot/issues/39 self.web_click(By.ID, "pstad-lnk-chngeCtgry") self.web_find(By.ID, "postad-step1-sbmt") category_url = f"{self.root_url}/p-kategorie-aendern.html#?path={ad_cfg['category']}" self.web_open(category_url) self.web_click(By.XPATH, "//*[@id='postad-step1-sbmt']/button") else: ensure(is_category_auto_selected, f"No category specified in [{ad_file}] and automatic category detection failed") if ad_cfg["special_attributes"]: LOG.debug('Found %i special attributes', len(ad_cfg["special_attributes"])) for special_attribute_key, special_attribute_value in ad_cfg["special_attributes"].items(): LOG.debug("Setting special attribute [%s] to [%s]...", special_attribute_key, special_attribute_value) try: # if the