mirror of
https://github.com/Second-Hand-Friends/kleinanzeigen-bot.git
synced 2026-03-12 18:41:50 +01:00
487 lines
20 KiB
Python
487 lines
20 KiB
Python
"""
|
|
Copyright (C) 2022 Sebastian Thomschke and contributors
|
|
SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""
|
|
import atexit, copy, getopt, glob, json, logging, os, signal, sys, textwrap, time, urllib
|
|
from collections.abc import Iterable
|
|
from datetime import datetime
|
|
import importlib.metadata
|
|
from logging.handlers import RotatingFileHandler
|
|
from typing import Any, Final
|
|
|
|
from ruamel.yaml import YAML
|
|
from selenium.common.exceptions import NoSuchElementException
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
from . import utils, resources
|
|
from .utils import apply_defaults, ensure, is_frozen, pause, pluralize, safe_get
|
|
from .selenium_mixin import SeleniumMixin
|
|
|
|
LOG_ROOT:Final[logging.Logger] = logging.getLogger()
|
|
LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot")
|
|
LOG.setLevel(logging.INFO)
|
|
|
|
|
|
class KleinanzeigenBot(SeleniumMixin):
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
self.root_url = "https://www.ebay-kleinanzeigen.de"
|
|
|
|
self.config:dict[str, Any] = {}
|
|
self.config_file_path = os.path.join(os.getcwd(), "config.yaml")
|
|
|
|
self.categories:dict[str, str] = {}
|
|
|
|
self.file_log:logging.FileHandler = None
|
|
if is_frozen():
|
|
log_file_basename = os.path.splitext(os.path.basename(sys.executable))[0]
|
|
else:
|
|
log_file_basename = self.__module__
|
|
self.log_file_path = os.path.join(os.getcwd(), f"{log_file_basename}.log")
|
|
|
|
self.command = "help"
|
|
self.force_mode = False
|
|
|
|
def __del__(self):
|
|
if self.file_log:
|
|
LOG_ROOT.removeHandler(self.file_log)
|
|
|
|
def get_version(self) -> str:
|
|
return importlib.metadata.version(__package__)
|
|
|
|
def run(self, args:Iterable[str]) -> None:
|
|
self.parse_args(args)
|
|
match self.command:
|
|
case "help":
|
|
self.show_help()
|
|
case "version":
|
|
print(self.get_version())
|
|
case "verify":
|
|
self.configure_file_logging()
|
|
self.load_config()
|
|
self.load_ads()
|
|
LOG.info("############################################")
|
|
LOG.info("No configuration errors found.")
|
|
LOG.info("############################################")
|
|
case "publish":
|
|
self.configure_file_logging()
|
|
self.load_config()
|
|
if ads := self.load_ads(exclude_undue = not self.force_mode):
|
|
self.create_webdriver_session()
|
|
self.login()
|
|
self.publish_ads(ads)
|
|
else:
|
|
LOG.info("############################################")
|
|
LOG.info("No ads to (re-)publish found.")
|
|
LOG.info("############################################")
|
|
|
|
case _:
|
|
LOG.error("Unknown command: %s", self.command)
|
|
sys.exit(2)
|
|
|
|
def show_help(self) -> None:
|
|
if is_frozen():
|
|
exe = sys.argv[0]
|
|
elif os.getenv("PDM_PROJECT_ROOT", ""):
|
|
exe = "pdm run app"
|
|
else:
|
|
exe = "python -m kleinanzeigen_bot"
|
|
|
|
print(textwrap.dedent(f"""\
|
|
Usage: {exe} COMMAND [--config=<PATH>] [--force] [--logfile=<PATH>] [-v|--verbose]
|
|
|
|
Commands:
|
|
publish - (re-)publishes ads
|
|
verify - verifies the configuration files
|
|
--
|
|
help - displays this help (default command)
|
|
version - displays the application version
|
|
|
|
Flags:
|
|
--config=<PATH> - path to the config YAML or JSON file (default: ./config.yaml)
|
|
--force - republish all ads ignoring republication_interval
|
|
--logfile=<PATH> - path to the logfile (default: ./kleinanzeigen-bot.log)
|
|
-v, --verbose - enables verbose output - only useful when troubleshooting issues
|
|
"""))
|
|
|
|
def parse_args(self, args:Iterable[str]) -> None:
|
|
try:
|
|
options, arguments = getopt.gnu_getopt(args[1:], "hv", ["help", "verbose", "force", "logfile=", "config="]) # pylint: disable=unused-variable
|
|
except getopt.error as ex:
|
|
LOG.error(ex.msg)
|
|
LOG.error("Use --help to display available options")
|
|
sys.exit(2)
|
|
|
|
for option, value in options:
|
|
match option:
|
|
case "-h" | "--help":
|
|
self.show_help()
|
|
sys.exit(0)
|
|
case "--config":
|
|
self.config_file_path = os.path.abspath(value)
|
|
case "--logfile":
|
|
if value:
|
|
self.log_file_path = os.path.abspath(value)
|
|
else:
|
|
self.log_file_path = None
|
|
case "--force":
|
|
self.force_mode = True
|
|
case "-v" | "--verbose":
|
|
LOG.setLevel(logging.DEBUG)
|
|
|
|
match len(arguments):
|
|
case 0:
|
|
self.command = "help"
|
|
case 1:
|
|
self.command = arguments[0]
|
|
case _:
|
|
LOG.error("More than one command given: %s", arguments)
|
|
sys.exit(2)
|
|
|
|
def configure_file_logging(self) -> None:
|
|
if not self.log_file_path:
|
|
return
|
|
if self.file_log:
|
|
return
|
|
|
|
LOG.info("Logging to [%s]...", self.log_file_path)
|
|
self.file_log = RotatingFileHandler(filename = self.log_file_path, maxBytes = 10 * 1024 * 1024, backupCount = 10, encoding = "utf-8")
|
|
self.file_log.setLevel(logging.DEBUG)
|
|
self.file_log.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
|
|
LOG_ROOT.addHandler(self.file_log)
|
|
|
|
LOG.info("App version: %s", self.get_version())
|
|
|
|
def load_ads(self, *, exclude_inactive = True, exclude_undue = True) -> Iterable[dict[str, Any]]:
|
|
LOG.info("Searching for ad files...")
|
|
|
|
ad_files = set()
|
|
for file_pattern in self.config["ad_files"]:
|
|
for ad_file in glob.glob(file_pattern, root_dir = os.getcwd(), recursive = True):
|
|
ad_files.add(os.path.abspath(ad_file))
|
|
LOG.info(" -> found %s", pluralize("ad file", ad_files))
|
|
if not ad_files:
|
|
return []
|
|
|
|
descr_prefix = self.config["ad_defaults"]["description"]["prefix"] or ""
|
|
descr_suffix = self.config["ad_defaults"]["description"]["suffix"] or ""
|
|
|
|
ad_fields = utils.load_dict_from_module(resources, "ad_fields.yaml")
|
|
ads = []
|
|
for ad_file in sorted(ad_files):
|
|
|
|
ad_cfg_orig = utils.load_dict(ad_file, "ad file")
|
|
ad_cfg = copy.deepcopy(ad_cfg_orig)
|
|
apply_defaults(ad_cfg, self.config["ad_defaults"], ignore = lambda k, _: k == "description", override = lambda _, v: v == "")
|
|
apply_defaults(ad_cfg, ad_fields)
|
|
|
|
if exclude_inactive and not ad_cfg["active"]:
|
|
LOG.info(" -> excluding inactive ad [%s]", ad_file)
|
|
continue
|
|
|
|
if exclude_undue:
|
|
if ad_cfg["updated_on"]:
|
|
last_updated_on = datetime.fromisoformat(ad_cfg["updated_on"])
|
|
elif ad_cfg["created_on"]:
|
|
last_updated_on = datetime.fromisoformat(ad_cfg["created_on"])
|
|
else:
|
|
last_updated_on = None
|
|
|
|
if last_updated_on:
|
|
ad_age = datetime.utcnow() - last_updated_on
|
|
if ad_age.days <= ad_cfg["republication_interval"]:
|
|
LOG.info(" -> skipping. last published %d days ago. republication is only required every %s days",
|
|
ad_age.days,
|
|
ad_cfg["republication_interval"]
|
|
)
|
|
continue
|
|
|
|
ad_cfg["description"] = descr_prefix + (ad_cfg["description"] or "") + descr_suffix
|
|
|
|
# pylint: disable=cell-var-from-loop
|
|
def assert_one_of(path:str, allowed:Iterable):
|
|
ensure(safe_get(ad_cfg, *path.split(".")) in allowed, f"-> property [{path}] must be one of: {allowed} @ [{ad_file}]")
|
|
|
|
def assert_min_len(path:str, minlen:int):
|
|
ensure(len(safe_get(ad_cfg, *path.split("."))) >= minlen, f"-> property [{path}] must be at least {minlen} characters long @ [{ad_file}]")
|
|
|
|
def assert_has_value(path:str):
|
|
ensure(safe_get(ad_cfg, *path.split(".")), f"-> property [{path}] not specified @ [{ad_file}]")
|
|
# pylint: enable=cell-var-from-loop
|
|
|
|
assert_one_of("type", ("OFFER", "WANTED"))
|
|
assert_min_len("title", 10)
|
|
assert_has_value("description")
|
|
assert_has_value("price")
|
|
assert_one_of("price_type", ("FIXED", "NEGOTIABLE", "GIVE_AWAY"))
|
|
assert_one_of("shipping_type", ("PICKUP", "SHIPPING", "NOT_APPLICABLE"))
|
|
assert_has_value("contact.name")
|
|
assert_has_value("republication_interval")
|
|
|
|
if ad_cfg["id"]:
|
|
ad_cfg["id"] = int(ad_cfg["id"])
|
|
|
|
if ad_cfg["category"]:
|
|
ad_cfg["category"] = self.categories.get(ad_cfg["category"], ad_cfg["category"])
|
|
|
|
if ad_cfg["images"]:
|
|
images = set()
|
|
for image_pattern in ad_cfg["images"]:
|
|
for image_file in glob.glob(image_pattern, root_dir = os.path.dirname(ad_file), recursive = True):
|
|
_, image_file_ext = os.path.splitext(image_file)
|
|
ensure(image_file_ext.lower() in {".gif", ".jpg", ".jpeg", ".png"}, f"Unsupported image file type [{image_file}]")
|
|
if os.path.isabs(image_file):
|
|
images.add(image_file)
|
|
else:
|
|
images.add(os.path.join(os.path.dirname(ad_file), image_file))
|
|
ensure(images or not ad_cfg["images"], f"No images found for given file patterns {ad_cfg['images']} at {os.getcwd()}")
|
|
ad_cfg["images"] = sorted(images)
|
|
|
|
ads.append((
|
|
ad_file,
|
|
ad_cfg,
|
|
ad_cfg_orig
|
|
))
|
|
|
|
LOG.info(" -> loaded %s", pluralize("ad", ads))
|
|
return ads
|
|
|
|
def load_config(self) -> None:
|
|
config_defaults = utils.load_dict_from_module(resources, "config_defaults.yaml")
|
|
config = utils.load_dict(self.config_file_path, "config", must_exist = False)
|
|
|
|
if config is None:
|
|
LOG.warning("Config file %s does not exist. Creating it with default values...", self.config_file_path)
|
|
utils.save_dict(self.config_file_path, config_defaults)
|
|
config = {}
|
|
|
|
self.config = apply_defaults(config, config_defaults)
|
|
|
|
self.categories = utils.load_dict_from_module(resources, "categories.yaml", "categories")
|
|
if self.config["categories"]:
|
|
self.categories.update(self.config["categories"])
|
|
LOG.info(" -> found %s", pluralize("category", self.categories))
|
|
|
|
ensure(self.config["login"]["username"], f"[login.username] not specified @ [{self.config_file_path}]")
|
|
ensure(self.config["login"]["password"], f"[login.password] not specified @ [{self.config_file_path}]")
|
|
|
|
self.browser_arguments = self.config["browser"]["arguments"]
|
|
self.browser_binary_location = self.config["browser"]["binary_location"]
|
|
|
|
def login(self) -> None:
|
|
LOG.info("Logging in as [%s]...", self.config["login"]["username"])
|
|
self.web_open(f"{self.root_url}/m-einloggen.html")
|
|
|
|
# accept privacy banner
|
|
self.web_click(By.ID, "gdpr-banner-accept")
|
|
|
|
self.web_input(By.ID, "login-email", self.config["login"]["username"])
|
|
self.web_input(By.ID, "login-password", self.config["login"]["password"])
|
|
|
|
self.handle_captcha_if_present("login-recaptcha", "but DON'T click 'Einloggen'.")
|
|
|
|
self.web_click(By.ID, "login-submit")
|
|
|
|
pause(800, 3000)
|
|
|
|
def handle_captcha_if_present(self, captcha_element_id:str, msg:str) -> None:
|
|
try:
|
|
self.web_click(By.XPATH, f"//*[@id='{captcha_element_id}']")
|
|
except NoSuchElementException:
|
|
return
|
|
|
|
LOG.warning("############################################")
|
|
LOG.warning("# Captcha present! Please solve and close the captcha, %s", msg)
|
|
LOG.warning("############################################")
|
|
self.webdriver.switch_to.frame(self.web_find(By.CSS_SELECTOR, f"#{captcha_element_id} iframe"))
|
|
self.web_await(lambda _: self.webdriver.find_element(By.ID, "recaptcha-anchor").get_attribute("aria-checked") == "true", timeout = 5 * 60)
|
|
self.webdriver.switch_to.default_content()
|
|
|
|
def delete_ad(self, ad_cfg: dict[str, Any]) -> bool:
|
|
LOG.info("Deleting ad '%s' if already present...", ad_cfg["title"])
|
|
|
|
self.web_open(f"{self.root_url}/m-meine-anzeigen.html")
|
|
csrf_token_elem = self.web_find(By.XPATH, "//meta[@name='_csrf']")
|
|
csrf_token = csrf_token_elem.get_attribute("content")
|
|
|
|
published_ads = json.loads(self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT")["content"])["ads"]
|
|
|
|
for published_ad in published_ads:
|
|
published_ad_id = int(published_ad.get("id", -1))
|
|
published_ad_title = published_ad.get("title", "")
|
|
if ad_cfg["id"] == published_ad_id or ad_cfg["title"] == published_ad_title:
|
|
LOG.info(" -> deleting %s '%s'...", published_ad_id, published_ad_title)
|
|
self.web_request(
|
|
url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={published_ad_id}",
|
|
method = "POST",
|
|
headers = {"x-csrf-token": csrf_token}
|
|
)
|
|
pause(1500, 3000)
|
|
|
|
ad_cfg["id"] = None
|
|
return True
|
|
|
|
def publish_ads(self, ad_cfgs:Iterable[dict[str, Any]]) -> None:
|
|
count = 0
|
|
|
|
for (ad_file, ad_cfg, ad_cfg_orig) in ad_cfgs:
|
|
count += 1
|
|
LOG.info("Processing %s/%s: '%s' from [%s]...", count, len(ad_cfgs), ad_cfg["title"], ad_file)
|
|
self.publish_ad(ad_file, ad_cfg, ad_cfg_orig)
|
|
pause(3000, 5000)
|
|
|
|
LOG.info("############################################")
|
|
LOG.info("(Re-)published %s", pluralize("ad", count))
|
|
LOG.info("############################################")
|
|
|
|
def publish_ad(self, ad_file, ad_cfg: dict[str, Any], ad_cfg_orig: dict[str, Any]) -> None:
|
|
self.delete_ad(ad_cfg)
|
|
|
|
LOG.info("Publishing ad '%s'...", ad_cfg["title"])
|
|
|
|
if LOG.isEnabledFor(logging.DEBUG):
|
|
LOG.debug(" -> effective ad meta:")
|
|
YAML().dump(ad_cfg, sys.stdout)
|
|
|
|
self.web_open(f"{self.root_url}/p-anzeige-aufgeben-schritt2.html")
|
|
|
|
if ad_cfg["type"] == "WANTED":
|
|
self.web_click(By.ID, "adType2")
|
|
|
|
#############################
|
|
# set title
|
|
#############################
|
|
self.web_input(By.ID, "postad-title", ad_cfg["title"])
|
|
|
|
#############################
|
|
# set category
|
|
#############################
|
|
# trigger and wait for automatic category detection
|
|
self.web_click(By.ID, "pstad-price")
|
|
try:
|
|
self.web_find(By.XPATH, "//*[@id='postad-category-path'][text()]")
|
|
is_category_auto_selected = True
|
|
except BaseException:
|
|
is_category_auto_selected = False
|
|
|
|
if ad_cfg["category"]:
|
|
self.web_click(By.ID, "pstad-lnk-chngeCtgry")
|
|
self.web_find(By.ID, "postad-step1-sbmt")
|
|
|
|
category_url = f"{self.root_url}/p-kategorie-aendern.html#?path={ad_cfg['category']}"
|
|
self.web_open(category_url)
|
|
self.web_click(By.XPATH, "//*[@id='postad-step1-sbmt']/button")
|
|
else:
|
|
ensure(is_category_auto_selected, f"No category specified in [{ad_file}] and automatic category detection failed")
|
|
|
|
#############################
|
|
# set price
|
|
#############################
|
|
self.web_select(By.XPATH, "//select[@id='priceType']", ad_cfg["price_type"])
|
|
if ad_cfg["price_type"] != "GIVE_AWAY":
|
|
self.web_input(By.ID, "pstad-price", ad_cfg["price"])
|
|
|
|
#############################
|
|
# set description
|
|
#############################
|
|
self.web_execute("document.querySelector('#pstad-descrptn').value = `" + ad_cfg["description"].replace("`", "'") + "`")
|
|
|
|
#############################
|
|
# set contact zipcode
|
|
#############################
|
|
if ad_cfg["contact"]["zipcode"]:
|
|
self.web_input(By.ID, "pstad-zip", ad_cfg["contact"]["zipcode"])
|
|
|
|
#############################
|
|
# set contact street
|
|
#############################
|
|
if ad_cfg["contact"]["street"]:
|
|
self.web_input(By.ID, "pstad-street", ad_cfg["contact"]["street"])
|
|
|
|
#############################
|
|
# set contact name
|
|
#############################
|
|
if ad_cfg["contact"]["name"]:
|
|
self.web_input(By.ID, "postad-contactname", ad_cfg["contact"]["name"])
|
|
|
|
#############################
|
|
# set contact phone
|
|
#############################
|
|
if ad_cfg["contact"]["phone"]:
|
|
self.web_input(By.ID, "postad-phonenumber", ad_cfg["contact"]["phone"])
|
|
|
|
#############################
|
|
# upload images
|
|
#############################
|
|
LOG.info(" -> found %s", pluralize("image", ad_cfg["images"]))
|
|
image_upload = self.web_find(By.XPATH, "//input[@type='file']")
|
|
|
|
def count_uploaded_images():
|
|
return len(self.webdriver.find_elements(By.CLASS_NAME, "imagebox-new-thumbnail"))
|
|
|
|
for image in ad_cfg["images"]:
|
|
LOG.info(" -> uploading image [%s]", image)
|
|
previous_uploaded_images_count = count_uploaded_images()
|
|
image_upload.send_keys(image)
|
|
start_at = time.time()
|
|
while previous_uploaded_images_count == count_uploaded_images() and time.time() - start_at < 60:
|
|
print(".", end = "", flush = True)
|
|
time.sleep(1)
|
|
print(flush = True)
|
|
|
|
ensure(previous_uploaded_images_count < count_uploaded_images(), f"Couldn't upload image [{image}] within 60 seconds")
|
|
LOG.debug(" => uploaded image within %i seconds", time.time() - start_at)
|
|
|
|
#############################
|
|
# submit
|
|
#############################
|
|
self.handle_captcha_if_present("postAd-recaptcha", "but DON'T click 'Anzeige aufgeben'.")
|
|
self.web_click(By.ID, "pstad-submit")
|
|
self.web_await(EC.url_contains("p-anzeige-aufgeben-bestaetigung.html?adId="), 20)
|
|
|
|
ad_cfg_orig["updated_on"] = datetime.utcnow().isoformat()
|
|
if not ad_cfg_orig["created_on"] and not ad_cfg_orig["id"]:
|
|
ad_cfg_orig["created_on"] = ad_cfg_orig["updated_on"]
|
|
|
|
# extract the ad id from the URL's query parameter
|
|
current_url_query_params = urllib.parse.parse_qs(urllib.parse.urlparse(self.webdriver.current_url).query)
|
|
ad_id = int(current_url_query_params.get("adId", None)[0])
|
|
ad_cfg_orig["id"] = ad_id
|
|
|
|
LOG.info(" -> SUCCESS: ad published with ID %s", ad_id)
|
|
|
|
utils.save_dict(ad_file, ad_cfg_orig)
|
|
|
|
|
|
#############################
|
|
# main entry point
|
|
#############################
|
|
def main(args:Iterable[str]):
|
|
if "version" not in args:
|
|
print(textwrap.dedent(r"""
|
|
_ _ _ _ _ _
|
|
| | _| | ___(_)_ __ __ _ _ __ _______(_) __ _ ___ _ __ | |__ ___ | |_
|
|
| |/ / |/ _ \ | '_ \ / _` | '_ \|_ / _ \ |/ _` |/ _ \ '_ \ ____| '_ \ / _ \| __|
|
|
| <| | __/ | | | | (_| | | | |/ / __/ | (_| | __/ | | |____| |_) | (_) | |_
|
|
|_|\_\_|\___|_|_| |_|\__,_|_| |_/___\___|_|\__, |\___|_| |_| |_.__/ \___/ \__|
|
|
|___/
|
|
https://github.com/kleinanzeigen-bot
|
|
"""), flush = True)
|
|
|
|
utils.configure_console_logging()
|
|
|
|
signal.signal(signal.SIGINT, utils.on_sigint) # capture CTRL+C
|
|
sys.excepthook = utils.on_exception
|
|
atexit.register(utils.on_exit)
|
|
|
|
KleinanzeigenBot().run(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
utils.configure_console_logging()
|
|
LOG.error("Direct execution not supported. Use 'pdm run app'")
|
|
sys.exit(1)
|