mirror of
https://github.com/Second-Hand-Friends/kleinanzeigen-bot.git
synced 2026-03-12 18:41:50 +01:00
Move smooth_scroll_page() from utils to selenium_mixin
This commit is contained in:
@@ -9,18 +9,19 @@ import selenium.webdriver.support.expected_conditions as EC
|
|||||||
from selenium.common.exceptions import NoSuchElementException
|
from selenium.common.exceptions import NoSuchElementException
|
||||||
from selenium.webdriver.common.by import By
|
from selenium.webdriver.common.by import By
|
||||||
from selenium.webdriver.remote.webdriver import WebDriver
|
from selenium.webdriver.remote.webdriver import WebDriver
|
||||||
from selenium.webdriver.support.wait import WebDriverWait
|
|
||||||
|
|
||||||
from .utils import parse_decimal, pause, smooth_scroll_page
|
from .selenium_mixin import SeleniumMixin
|
||||||
|
from .utils import parse_decimal, pause
|
||||||
|
|
||||||
|
|
||||||
class AdExtractor:
|
class AdExtractor(SeleniumMixin):
|
||||||
"""
|
"""
|
||||||
Wrapper class for ad extraction that uses an active bot´s web driver to extract specific elements from an ad page.
|
Wrapper class for ad extraction that uses an active bot´s web driver to extract specific elements from an ad page.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, driver:WebDriver):
|
def __init__(self, driver:WebDriver):
|
||||||
self.driver = driver
|
super().__init__()
|
||||||
|
self.webdriver = driver
|
||||||
|
|
||||||
def extract_category_from_ad_page(self) -> str:
|
def extract_category_from_ad_page(self) -> str:
|
||||||
"""
|
"""
|
||||||
@@ -29,7 +30,7 @@ class AdExtractor:
|
|||||||
|
|
||||||
:return: a category string of form abc/def, where a-f are digits
|
:return: a category string of form abc/def, where a-f are digits
|
||||||
"""
|
"""
|
||||||
category_line = self.driver.find_element(By.XPATH, '//*[@id="vap-brdcrmb"]')
|
category_line = self.webdriver.find_element(By.XPATH, '//*[@id="vap-brdcrmb"]')
|
||||||
category_first_part = category_line.find_element(By.XPATH, './/a[2]')
|
category_first_part = category_line.find_element(By.XPATH, './/a[2]')
|
||||||
category_second_part = category_line.find_element(By.XPATH, './/a[3]')
|
category_second_part = category_line.find_element(By.XPATH, './/a[3]')
|
||||||
cat_num_first = category_first_part.get_attribute('href').split('/')[-1][1:]
|
cat_num_first = category_first_part.get_attribute('href').split('/')[-1][1:]
|
||||||
@@ -44,7 +45,7 @@ class AdExtractor:
|
|||||||
|
|
||||||
:return: a dictionary (possibly empty) where the keys are the attribute names, mapped to their values
|
:return: a dictionary (possibly empty) where the keys are the attribute names, mapped to their values
|
||||||
"""
|
"""
|
||||||
belen_conf = self.driver.execute_script("return window.BelenConf")
|
belen_conf = self.webdriver.execute_script("return window.BelenConf")
|
||||||
special_attributes_str = belen_conf["universalAnalyticsOpts"]["dimensions"]["dimension108"]
|
special_attributes_str = belen_conf["universalAnalyticsOpts"]["dimensions"]["dimension108"]
|
||||||
special_attributes = json.loads(special_attributes_str)
|
special_attributes = json.loads(special_attributes_str)
|
||||||
if not isinstance(special_attributes, dict):
|
if not isinstance(special_attributes, dict):
|
||||||
@@ -62,7 +63,7 @@ class AdExtractor:
|
|||||||
:return: the price of the offer (optional); and the pricing type
|
:return: the price of the offer (optional); and the pricing type
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
price_str:str = self.driver.find_element(By.CLASS_NAME, 'boxedarticle--price').text
|
price_str:str = self.webdriver.find_element(By.CLASS_NAME, 'boxedarticle--price').text
|
||||||
price_type:str
|
price_type:str
|
||||||
price:float | None = -1
|
price:float | None = -1
|
||||||
match price_str.split()[-1]:
|
match price_str.split()[-1]:
|
||||||
@@ -92,7 +93,7 @@ class AdExtractor:
|
|||||||
"""
|
"""
|
||||||
ship_type, ship_costs = 'NOT_APPLICABLE', None
|
ship_type, ship_costs = 'NOT_APPLICABLE', None
|
||||||
try:
|
try:
|
||||||
shipping_text = self.driver.find_element(By.CSS_SELECTOR, '.boxedarticle--details--shipping') \
|
shipping_text = self.webdriver.find_element(By.CSS_SELECTOR, '.boxedarticle--details--shipping') \
|
||||||
.text.strip()
|
.text.strip()
|
||||||
# e.g. '+ Versand ab 5,49 €' OR 'Nur Abholung'
|
# e.g. '+ Versand ab 5,49 €' OR 'Nur Abholung'
|
||||||
if shipping_text == 'Nur Abholung':
|
if shipping_text == 'Nur Abholung':
|
||||||
@@ -116,11 +117,11 @@ class AdExtractor:
|
|||||||
:return: a dictionary containing the address parts with their corresponding values
|
:return: a dictionary containing the address parts with their corresponding values
|
||||||
"""
|
"""
|
||||||
contact = {}
|
contact = {}
|
||||||
address_element = self.driver.find_element(By.CSS_SELECTOR, '#viewad-locality')
|
address_element = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-locality')
|
||||||
address_text = address_element.text.strip()
|
address_text = address_element.text.strip()
|
||||||
# format: e.g. (Beispiel Allee 42,) 12345 Bundesland - Stadt
|
# format: e.g. (Beispiel Allee 42,) 12345 Bundesland - Stadt
|
||||||
try:
|
try:
|
||||||
street_element = self.driver.find_element(By.XPATH, '//*[@id="street-address"]')
|
street_element = self.webdriver.find_element(By.XPATH, '//*[@id="street-address"]')
|
||||||
street = street_element.text[:-2] # trailing comma and whitespace
|
street = street_element.text[:-2] # trailing comma and whitespace
|
||||||
contact['street'] = street
|
contact['street'] = street
|
||||||
except NoSuchElementException:
|
except NoSuchElementException:
|
||||||
@@ -130,7 +131,7 @@ class AdExtractor:
|
|||||||
address_left_parts = address_halves[0].split(' ') # zip code and region/city
|
address_left_parts = address_halves[0].split(' ') # zip code and region/city
|
||||||
contact['zipcode'] = address_left_parts[0]
|
contact['zipcode'] = address_left_parts[0]
|
||||||
|
|
||||||
contact_person_element = self.driver.find_element(By.CSS_SELECTOR, '#viewad-contact')
|
contact_person_element = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-contact')
|
||||||
name_element = contact_person_element.find_element(By.CLASS_NAME, 'iconlist-text')
|
name_element = contact_person_element.find_element(By.CLASS_NAME, 'iconlist-text')
|
||||||
try:
|
try:
|
||||||
name = name_element.find_element(By.TAG_NAME, 'a').text
|
name = name_element.find_element(By.TAG_NAME, 'a').text
|
||||||
@@ -141,7 +142,7 @@ class AdExtractor:
|
|||||||
if 'street' not in contact:
|
if 'street' not in contact:
|
||||||
contact['street'] = None
|
contact['street'] = None
|
||||||
try: # phone number is unusual for non-professional sellers today
|
try: # phone number is unusual for non-professional sellers today
|
||||||
phone_element = self.driver.find_element(By.CSS_SELECTOR, '#viewad-contact-phone')
|
phone_element = self.webdriver.find_element(By.CSS_SELECTOR, '#viewad-contact-phone')
|
||||||
phone_number = phone_element.find_element(By.TAG_NAME, 'a').text
|
phone_number = phone_element.find_element(By.TAG_NAME, 'a').text
|
||||||
contact['phone'] = ''.join(phone_number.replace('-', ' ').split(' ')).replace('+49(0)', '0')
|
contact['phone'] = ''.join(phone_number.replace('-', ' ').split(' ')).replace('+49(0)', '0')
|
||||||
except NoSuchElementException:
|
except NoSuchElementException:
|
||||||
@@ -157,15 +158,15 @@ class AdExtractor:
|
|||||||
:return: the links to your ad pages
|
:return: the links to your ad pages
|
||||||
"""
|
"""
|
||||||
# navigate to your ads page
|
# navigate to your ads page
|
||||||
self.driver.get('https://www.ebay-kleinanzeigen.de/m-meine-anzeigen.html')
|
self.webdriver.get('https://www.ebay-kleinanzeigen.de/m-meine-anzeigen.html')
|
||||||
WebDriverWait(self.driver, 15).until(EC.url_contains('meine-anzeigen'))
|
self.web_await(EC.url_contains('meine-anzeigen'), 15)
|
||||||
pause(2000, 3000)
|
pause(2000, 3000)
|
||||||
|
|
||||||
# collect ad references:
|
# collect ad references:
|
||||||
|
|
||||||
pagination_section = self.driver.find_element(By.CSS_SELECTOR, 'section.jsx-1105488430:nth-child(4)')
|
pagination_section = self.webdriver.find_element(By.CSS_SELECTOR, 'section.jsx-1105488430:nth-child(4)')
|
||||||
# scroll down to load dynamically
|
# scroll down to load dynamically
|
||||||
smooth_scroll_page(self.driver)
|
self.web_scroll_page_down()
|
||||||
pause(2000, 3000)
|
pause(2000, 3000)
|
||||||
# detect multi-page
|
# detect multi-page
|
||||||
try:
|
try:
|
||||||
@@ -186,20 +187,20 @@ class AdExtractor:
|
|||||||
refs:list[str] = []
|
refs:list[str] = []
|
||||||
while True: # loop reference extraction until no more forward page
|
while True: # loop reference extraction until no more forward page
|
||||||
# extract references
|
# extract references
|
||||||
list_section = self.driver.find_element(By.XPATH, '//*[@id="my-manageads-adlist"]')
|
list_section = self.webdriver.find_element(By.XPATH, '//*[@id="my-manageads-adlist"]')
|
||||||
list_items = list_section.find_elements(By.CLASS_NAME, 'cardbox')
|
list_items = list_section.find_elements(By.CLASS_NAME, 'cardbox')
|
||||||
refs += [li.find_element(By.XPATH, 'article/section/section[2]/h2/div/a').get_attribute('href') for li in list_items]
|
refs += [li.find_element(By.XPATH, 'article/section/section[2]/h2/div/a').get_attribute('href') for li in list_items]
|
||||||
|
|
||||||
if not multi_page: # only one iteration for single-page overview
|
if not multi_page: # only one iteration for single-page overview
|
||||||
break
|
break
|
||||||
# check if last page
|
# check if last page
|
||||||
nav_button = self.driver.find_elements(By.CSS_SELECTOR, 'button.jsx-2828608826')[-1]
|
nav_button = self.webdriver.find_elements(By.CSS_SELECTOR, 'button.jsx-2828608826')[-1]
|
||||||
if nav_button.get_attribute('title') != 'Nächste':
|
if nav_button.get_attribute('title') != 'Nächste':
|
||||||
print('Last ad overview page explored.')
|
print('Last ad overview page explored.')
|
||||||
break
|
break
|
||||||
# navigate to next overview page
|
# navigate to next overview page
|
||||||
nav_button.click()
|
nav_button.click()
|
||||||
pause(2000, 3000)
|
pause(2000, 3000)
|
||||||
smooth_scroll_page(self.driver)
|
self.web_scroll_page_down()
|
||||||
|
|
||||||
return refs
|
return refs
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
Copyright (C) 2022 Sebastian Thomschke and contributors
|
Copyright (C) 2022 Sebastian Thomschke and contributors
|
||||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
"""
|
"""
|
||||||
import logging, os, shutil
|
import logging, os, shutil, time
|
||||||
from collections.abc import Callable, Iterable
|
from collections.abc import Callable, Iterable
|
||||||
from typing import Any, Final
|
from typing import Any, Final
|
||||||
|
|
||||||
@@ -349,6 +349,27 @@ class SeleniumMixin:
|
|||||||
return response
|
return response
|
||||||
# pylint: enable=dangerous-default-value
|
# pylint: enable=dangerous-default-value
|
||||||
|
|
||||||
|
def web_scroll_page_down(self, scroll_length: int = 10, scroll_speed: int = 10000, scroll_back_top: bool = False):
|
||||||
|
"""
|
||||||
|
Smoothly scrolls the current web page down.
|
||||||
|
|
||||||
|
:param scroll_length: the length of a single scroll iteration, determines smoothness of scrolling, lower is smoother
|
||||||
|
:param scroll_speed: the speed of scrolling, higher is faster
|
||||||
|
:param scroll_back_top: whether to scroll the page back to the top after scrolling to the bottom
|
||||||
|
"""
|
||||||
|
current_y_pos = 0
|
||||||
|
bottom_y_pos: int = self.webdriver.execute_script('return document.body.scrollHeight;') # get bottom position by JS
|
||||||
|
while current_y_pos < bottom_y_pos: # scroll in steps until bottom reached
|
||||||
|
current_y_pos += scroll_length
|
||||||
|
self.webdriver.execute_script(f'window.scrollTo(0, {current_y_pos});') # scroll one step
|
||||||
|
time.sleep(scroll_length / scroll_speed)
|
||||||
|
|
||||||
|
if scroll_back_top: # scroll back to top in same style
|
||||||
|
while current_y_pos > 0:
|
||||||
|
current_y_pos -= scroll_length
|
||||||
|
self.webdriver.execute_script(f'window.scrollTo(0, {current_y_pos});')
|
||||||
|
time.sleep(scroll_length / scroll_speed / 2) # double speed
|
||||||
|
|
||||||
def web_select(self, selector_type:By, selector_value:str, selected_value:Any, timeout:float = 5) -> WebElement:
|
def web_select(self, selector_type:By, selector_value:str, selected_value:Any, timeout:float = 5) -> WebElement:
|
||||||
"""
|
"""
|
||||||
Selects an <option/> of a <select/> HTML element.
|
Selects an <option/> of a <select/> HTML element.
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ from typing import Any, Final, TypeVar
|
|||||||
|
|
||||||
import coloredlogs, inflect
|
import coloredlogs, inflect
|
||||||
from ruamel.yaml import YAML
|
from ruamel.yaml import YAML
|
||||||
from selenium.webdriver.chrome.webdriver import WebDriver
|
|
||||||
|
|
||||||
LOG_ROOT:Final[logging.Logger] = logging.getLogger()
|
LOG_ROOT:Final[logging.Logger] = logging.getLogger()
|
||||||
LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.utils")
|
LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.utils")
|
||||||
@@ -273,28 +272,6 @@ def parse_datetime(date:datetime | str | None) -> datetime | None:
|
|||||||
return datetime.fromisoformat(date)
|
return datetime.fromisoformat(date)
|
||||||
|
|
||||||
|
|
||||||
def smooth_scroll_page(driver: WebDriver, scroll_length: int = 10, scroll_speed: int = 10000, scroll_back_top: bool = False):
|
|
||||||
"""
|
|
||||||
Scrolls the current page of a web driver session.
|
|
||||||
:param driver: the web driver session
|
|
||||||
:param scroll_length: the length of a single scroll iteration, determines smoothness of scrolling, lower is smoother
|
|
||||||
:param scroll_speed: the speed of scrolling, higher is faster
|
|
||||||
:param scroll_back_top: whether to scroll the page back to the top after scrolling to the bottom
|
|
||||||
"""
|
|
||||||
current_y_pos = 0
|
|
||||||
bottom_y_pos: int = driver.execute_script('return document.body.scrollHeight;') # get bottom position by JS
|
|
||||||
while current_y_pos < bottom_y_pos: # scroll in steps until bottom reached
|
|
||||||
current_y_pos += scroll_length
|
|
||||||
driver.execute_script(f'window.scrollTo(0, {current_y_pos});') # scroll one step
|
|
||||||
time.sleep(scroll_length / scroll_speed)
|
|
||||||
|
|
||||||
if scroll_back_top: # scroll back to top in same style
|
|
||||||
while current_y_pos > 0:
|
|
||||||
current_y_pos -= scroll_length
|
|
||||||
driver.execute_script(f'window.scrollTo(0, {current_y_pos});')
|
|
||||||
time.sleep(scroll_length / scroll_speed / 2) # double speed
|
|
||||||
|
|
||||||
|
|
||||||
def extract_ad_id_from_ad_link(url: str) -> int:
|
def extract_ad_id_from_ad_link(url: str) -> int:
|
||||||
"""
|
"""
|
||||||
Extracts the ID of an ad, given by its reference link.
|
Extracts the ID of an ad, given by its reference link.
|
||||||
|
|||||||
Reference in New Issue
Block a user