From 5321b7cd12e6de93984fce5922599d17d9e10896 Mon Sep 17 00:00:00 2001 From: PhilK-7 <44678164+PhilK-7@users.noreply.github.com> Date: Tue, 25 Oct 2022 22:27:10 +0200 Subject: [PATCH] Add download command. Fixes #32 (#114) --- README.md | 6 +- kleinanzeigen_bot/__init__.py | 208 +++++++++++++++++++++++++++++++++- kleinanzeigen_bot/extract.py | 144 +++++++++++++++++++++++ 3 files changed, 352 insertions(+), 6 deletions(-) create mode 100644 kleinanzeigen_bot/extract.py diff --git a/README.md b/README.md index ddd1b01..af18435 100644 --- a/README.md +++ b/README.md @@ -177,8 +177,9 @@ It is the spiritual successor to [Second-Hand-Friends/ebayKleinanzeigen](https:/ Usage: kleinanzeigen-bot COMMAND [OPTIONS] Commands: - publish - (re-)publishes ads - verify - verifies the configuration files + publish - (re-)publishes ads + verify - verifies the configuration files + download - downloads an ad -- help - displays this help (default command) version - displays the application version @@ -191,6 +192,7 @@ Options: * new: only publish new ads (i.e. ads that have no id in the config file) --force - alias for '--ads=all' --keep-old - don't delete old ads on republication + --ad - provide the ad ID after this option when using the download command --config= - path to the config YAML or JSON file (DEFAULT: ./config.yaml) --logfile= - path to the logfile (DEFAULT: ./kleinanzeigen-bot.log) -v, --verbose - enables verbose output - only useful when troubleshooting issues diff --git a/kleinanzeigen_bot/__init__.py b/kleinanzeigen_bot/__init__.py index acea637..d9d693d 100644 --- a/kleinanzeigen_bot/__init__.py +++ b/kleinanzeigen_bot/__init__.py @@ -3,10 +3,13 @@ Copyright (C) 2022 Sebastian Thomschke and contributors SPDX-License-Identifier: AGPL-3.0-or-later """ import atexit, copy, getopt, importlib.metadata, json, logging, os, signal, sys, textwrap, time, urllib +import shutil from collections.abc import Iterable from datetime import datetime from logging.handlers import RotatingFileHandler from typing import Any, Final +from urllib import request + from wcmatch import glob from overrides import overrides @@ -15,10 +18,12 @@ from selenium.common.exceptions import NoSuchElementException, TimeoutException, from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC -from . import utils, resources +from . import utils, resources, extract # pylint: disable=W0406 from .utils import abspath, apply_defaults, ensure, is_frozen, pause, pluralize, safe_get from .selenium_mixin import SeleniumMixin +# W0406: possibly a bug, see https://github.com/PyCQA/pylint/issues/3933 + LOG_ROOT:Final[logging.Logger] = logging.getLogger() LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot") LOG.setLevel(logging.INFO) @@ -47,6 +52,7 @@ class KleinanzeigenBot(SeleniumMixin): self.ads_selector = "due" self.delete_old_ads = True self.delete_ads_by_title = False + self.ad_id = None # attribute needed when downloading an ad def __del__(self) -> None: if self.file_log: @@ -91,6 +97,26 @@ class KleinanzeigenBot(SeleniumMixin): LOG.info("############################################") LOG.info("DONE: No ads to delete found.") LOG.info("############################################") + case "download": + self.configure_file_logging() + # ad ID passed as value to download command + if self.ad_id is None: + LOG.error('Provide the flag \'--ad\' with a valid ad ID to use the download command!') + sys.exit(2) + if self.ad_id < 1: + LOG.error('The given ad ID must be valid!') + sys.exit(2) + LOG.info('Start fetch task for ad with ID %s', str(self.ad_id)) + + self.load_config() + self.create_webdriver_session() + self.login() + # call download function + exists = self.navigate_to_ad_page() + if exists: + self.download_ad_page() + else: + sys.exit(2) case _: LOG.error("Unknown command: %s", self.command) sys.exit(2) @@ -107,9 +133,10 @@ class KleinanzeigenBot(SeleniumMixin): Usage: {exe} COMMAND [OPTIONS] Commands: - publish - (re-)publishes ads - verify - verifies the configuration files - delete - deletes ads + publish - (re-)publishes ads + verify - verifies the configuration files + delete - deletes ads + download - downloads an ad -- help - displays this help (default command) version - displays the application version @@ -122,6 +149,7 @@ class KleinanzeigenBot(SeleniumMixin): * new: only publish new ads (i.e. ads that have no id in the config file) --force - alias for '--ads=all' --keep-old - don't delete old ads on republication + --ad - provide the ad ID after this option when using the download command --config= - path to the config YAML or JSON file (DEFAULT: ./config.yaml) --logfile= - path to the logfile (DEFAULT: ./kleinanzeigen-bot.log) -v, --verbose - enables verbose output - only useful when troubleshooting issues @@ -135,6 +163,7 @@ class KleinanzeigenBot(SeleniumMixin): "force", "help", "keep-old", + "ad=", "logfile=", "verbose" ]) @@ -161,6 +190,12 @@ class KleinanzeigenBot(SeleniumMixin): self.ads_selector = "all" case "--keep-old": self.delete_old_ads = False + case "--ad": + try: + self.ad_id:int = int(value) + except ValueError: # given value cannot be parsed as integer + LOG.error('The given ad ID (\"%s\") is not a valid number!', value) + sys.exit(2) case "-v" | "--verbose": LOG.setLevel(logging.DEBUG) @@ -628,6 +663,171 @@ class KleinanzeigenBot(SeleniumMixin): else: raise TimeoutException("Loading page failed, it still shows fullscreen ad.") from ex + def navigate_to_ad_page(self) -> bool: + """ + Navigates to an ad page specified with an ad ID. + + :return: whether the navigation to the ad page was successful + """ + # enter the ad ID into the search bar + self.web_input(By.XPATH, '//*[@id="site-search-query"]', str(self.ad_id)) + # navigate to ad page and wait + self.web_click(By.XPATH, '//*[@id="site-search-submit"]') + pause(1000, 2000) + + # handle the case that invalid ad ID given + if self.webdriver.current_url.endswith('k0'): + LOG.error('There is no ad under the given ID.') + return False + try: # close (warning) popup, if given + self.webdriver.find_element(By.CSS_SELECTOR, '#vap-ovrly-secure') + LOG.warning('A popup appeared.') + close_button = self.webdriver.find_element(By.CLASS_NAME, 'mfp-close') + close_button.click() + time.sleep(1) + except NoSuchElementException: + print('(no popup given)') + return True + + def download_images_from_ad_page(self, directory:str, ad_id:int, logger:logging.Logger) -> list[str]: + """ + Downloads all images of an ad. + + :param directory: the path of the directory created for this ad + :param ad_id: the ID of the ad to download the images from + :param logger: an initialized logger + :return: the relative paths for all downloaded images + """ + + n_images:int + img_paths = [] + try: + image_box = self.webdriver.find_element(By.CSS_SELECTOR, '.galleryimage-large') + + # if gallery image box exists, proceed with image fetching + n_images = 1 + + # determine number of images (1 ... N) + next_button = None + try: # check if multiple images given + image_counter = image_box.find_element(By.CSS_SELECTOR, '.galleryimage--info') + n_images = int(image_counter.text[2:]) + logger.info('Found %d images.', n_images) + next_button = self.webdriver.find_element(By.CSS_SELECTOR, '.galleryimage--navigation--next') + except NoSuchElementException: + logger.info('Only one image found.') + + # download all images from box + img_element = image_box.find_element(By.XPATH, './/div[1]/img') + img_fn_prefix = 'ad_' + str(ad_id) + '__img' + + img_nr = 1 + dl_counter = 0 + while img_nr <= n_images: # scrolling + downloading + current_img_url = img_element.get_attribute('src') # URL of the image + file_ending = current_img_url.split('.')[-1].lower() + img_path = directory + '/' + img_fn_prefix + str(img_nr) + '.' + file_ending + if current_img_url.startswith('https'): # verify https (for Bandit linter) + request.urlretrieve(current_img_url, img_path) # nosec B310 + dl_counter += 1 + img_paths.append(img_path.split('/')[-1]) + + # scroll to next image (if exists) + if img_nr < n_images: + try: + # click next button, wait, and reestablish reference + next_button.click() + self.web_await(lambda _: EC.staleness_of(img_element)) + new_div = self.webdriver.find_element(By.CSS_SELECTOR,f'div.galleryimage-element:nth-child({img_nr + 1})') + img_element = new_div.find_element(By.XPATH, './/img') + except NoSuchElementException: + logger.error('NEXT button in image gallery somehow missing, abort image fetching.') + break + img_nr += 1 + logger.info('Downloaded %d image(s).', dl_counter) + + except NoSuchElementException: # some ads do not require images + logger.warning('No image area found. Continue without downloading images.') + + return img_paths + + def extract_ad_page_info(self, directory:str) -> dict: + """ + Extracts all necessary information from an ad´s page. + + :param directory: the path of the ad´s previously created directory + :return: a dictionary with the keys as given in an ad YAML, and their respective values + """ + info = {'active': True} + + # extract basic info + if 's-anzeige' in self.webdriver.current_url: + o_type = 'OFFER' + else: + o_type = 'WANTED' + info['type'] = o_type + title:str = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-title').text + LOG.info('Extracting information from ad with title \"%s\"', title) + info['title'] = title + descr:str = self.webdriver.find_element(By.XPATH, '//*[@id="viewad-description-text"]').text + info['description'] = descr + + extractor = extract.AdExtractor(self.webdriver) + + # extract category + info['category'] = extractor.extract_category_from_ad_page() + + # get special attributes + info['special_attributes'] = extractor.extract_special_attributes_from_ad_page() + + # process pricing + info['price'], info['price_type'] = extractor.extract_pricing_info_from_ad_page() + + # process shipping + info['shipping_type'], info['shipping_costs'] = extractor.extract_shipping_info_from_ad_page() + + # fetch images + info['images'] = self.download_images_from_ad_page(directory, self.ad_id, LOG) + + # process address + info['contact'] = extractor.extract_contact_from_ad_page() + + # process meta info + info['republication_interval'] = 7 # a default value for downloaded ads + info['id'] = self.ad_id + try: # try different locations known for creation date element + creation_date = self.webdriver.find_element(By.XPATH, '/html/body/div[1]/div[2]/div/section[2]/section/section/article/div[3]/div[2]/div[2]/' + 'div[1]/span').text + except NoSuchElementException: + creation_date = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-extra-info > div:nth-child(1) > span:nth-child(2)').text + + # convert creation date to ISO format + created_parts = creation_date.split('.') + creation_date = created_parts[2] + '-' + created_parts[1] + '-' + created_parts[0] + ' 00:00:00' + info['created_on'] = datetime.fromisoformat(creation_date) + info['updated_on'] = None # will be set later on + + return info + + def download_ad_page(self): + """ + Downloads an ad to a specific location, specified by config and ad_id. + """ + + # create sub-directory for ad to download + relative_directory = str(self.config['ad_files'][0]).split('**', maxsplit=1)[0] + new_base_dir = os.path.join(relative_directory, f'ad_{self.ad_id}') + if os.path.exists(new_base_dir): + LOG.info('Deleting current folder of ad...') + shutil.rmtree(new_base_dir) + os.mkdir(new_base_dir) + LOG.info('New directory for ad created at %s.', new_base_dir) + + # call extraction function + info = self.extract_ad_page_info(new_base_dir) + ad_file_path = new_base_dir + '/' + f'ad_{self.ad_id}.yaml' + utils.save_dict(ad_file_path, info) + ############################# # main entry point diff --git a/kleinanzeigen_bot/extract.py b/kleinanzeigen_bot/extract.py new file mode 100644 index 0000000..0d42538 --- /dev/null +++ b/kleinanzeigen_bot/extract.py @@ -0,0 +1,144 @@ +""" +Copyright (C) 2022 Sebastian Thomschke and contributors +SPDX-License-Identifier: AGPL-3.0-or-later +""" +from decimal import DecimalException + +from selenium.common.exceptions import NoSuchElementException +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.webdriver import WebDriver + +from .utils import parse_decimal + + +class AdExtractor: + """ + Wrapper class for ad extraction that uses an active bot´s web driver to extract specific elements from an ad page. + """ + + def __init__(self, driver:WebDriver): + self.driver = driver + + def extract_category_from_ad_page(self) -> str: + """ + Extracts a category of an ad in numerical form. + Assumes that the web driver currently shows an ad page. + + :return: a category string of form abc/def, where a-f are digits + """ + category_line = self.driver.find_element(By.XPATH, '//*[@id="vap-brdcrmb"]') + category_first_part = category_line.find_element(By.XPATH, './/a[2]') + category_second_part = category_line.find_element(By.XPATH, './/a[3]') + cat_num_first = category_first_part.get_attribute('href').split('/')[-1][1:] + cat_num_second = category_second_part.get_attribute('href').split('/')[-1][1:] + category:str = cat_num_first + '/' + cat_num_second + + return category + + def extract_special_attributes_from_ad_page(self) -> dict: + """ + Extracts the special attributes from an ad page. + + :return: a dictionary (possibly empty) where the keys are the attribute names, mapped to their values + """ + + try: + details_box = self.driver.find_element(By.CSS_SELECTOR, '#viewad-details') + details_list = details_box.find_element(By.XPATH, './/ul') + list_items = details_list.find_elements(By.TAG_NAME, 'li') + details = {} + for list_item in list_items: + detail_key = list_item.text.split('\n')[0] + detail_value = list_item.find_element(By.TAG_NAME, 'span').text + details[detail_key] = detail_value + + return details + except NoSuchElementException: + return {} + + def extract_pricing_info_from_ad_page(self) -> (float | None, str): + """ + Extracts the pricing information (price and pricing type) from an ad page. + + :return: the price of the offer (optional); and the pricing type + """ + try: + price_str:str = self.driver.find_element(By.CLASS_NAME, 'boxedarticle--price').text + price_type:str + price:float | None = -1 + match price_str.split()[-1]: + case '€': + price_type = 'FIXED' + price = float(parse_decimal(price_str.split()[0].replace('.', ''))) + case 'VB': # can be either 'X € VB', or just 'VB' + price_type = 'NEGOTIABLE' + try: + price = float(parse_decimal(price_str.split()[0].replace('.', ''))) + except DecimalException: + price = None + case 'verschenken': + price_type = 'GIVE_AWAY' + price = None + case _: + price_type = 'NOT_APPLICABLE' + return price, price_type + except NoSuchElementException: # no 'commercial' ad, has no pricing box etc. + return None, 'NOT_APPLICABLE' + + def extract_shipping_info_from_ad_page(self) -> (str, float | None): + """ + Extracts shipping information from an ad page. + + :return: the shipping type, and the shipping price (optional) + """ + ship_type, ship_costs = 'NOT_APPLICABLE', None + try: + shipping_text = self.driver.find_element(By.CSS_SELECTOR, '.boxedarticle--details--shipping') \ + .text.strip() + # e.g. '+ Versand ab 5,49 €' OR 'Nur Abholung' + if shipping_text == 'Nur Abholung': + ship_type = 'PICKUP' + elif shipping_text == 'Versand möglich': + ship_type = 'SHIPPING' + elif '€' in shipping_text: + shipping_price_parts = shipping_text.split(' ') + shipping_price = float(parse_decimal(shipping_price_parts[-2])) + ship_type = 'SHIPPING' + ship_costs = shipping_price + except NoSuchElementException: # no pricing box -> no shipping given + ship_type = 'NOT_APPLICABLE' + + return ship_type, ship_costs + + def extract_contact_from_ad_page(self) -> dict: + """ + Processes the address part involving street (optional), zip code + city, and phone number (optional). + + :return: a dictionary containing the address parts with their corresponding values + """ + contact = {} + address_element = self.driver.find_element(By.CSS_SELECTOR, '#viewad-locality') + address_text = address_element.text.strip() + # format: e.g. (Beispiel Allee 42,) 12345 Bundesland - Stadt + try: + street_element = self.driver.find_element(By.XPATH, '//*[@id="street-address"]') + street = street_element.text[:-2] # trailing comma and whitespace + contact['street'] = street + except NoSuchElementException: + print('No street given in the contact.') + # construct remaining address + address_halves = address_text.split(' - ') + address_left_parts = address_halves[0].split(' ') # zip code and region/city + contact['zipcode'] = address_left_parts[0] + contact['name'] = address_halves[1] + if 'street' not in contact: + contact['street'] = None + try: # phone number is unusual for non-professional sellers today + phone_element = self.driver.find_element(By.CSS_SELECTOR, '#viewad-contact-phone') + phone_number = phone_element.find_element(By.TAG_NAME, 'a').text + contact['phone'] = ''.join(phone_number.replace('-', ' ').split(' ')).replace('+49(0)', '0') + except NoSuchElementException: + contact['phone'] = None # phone seems to be a deprecated feature + # also see 'https://themen.ebay-kleinanzeigen.de/hilfe/deine-anzeigen/Telefon/ + + return contact