Prefer double-quoted strings

This commit is contained in:
sebthom
2022-01-31 07:47:03 +01:00
parent e68a02b76b
commit 8c2d340b56
3 changed files with 43 additions and 43 deletions

View File

@@ -140,7 +140,7 @@ class KleinanzeigenBot(SeleniumMixin):
LOG.info("Logging to [%s]...", self.log_file_path) LOG.info("Logging to [%s]...", self.log_file_path)
self.file_log = RotatingFileHandler(filename = self.log_file_path, maxBytes = 10 * 1024 * 1024, backupCount = 10, encoding = "utf-8") self.file_log = RotatingFileHandler(filename = self.log_file_path, maxBytes = 10 * 1024 * 1024, backupCount = 10, encoding = "utf-8")
self.file_log.setLevel(logging.DEBUG) self.file_log.setLevel(logging.DEBUG)
self.file_log.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) self.file_log.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
LOG_ROOT.addHandler(self.file_log) LOG_ROOT.addHandler(self.file_log)
LOG.info("App version: %s", self.get_version()) LOG.info("App version: %s", self.get_version())
@@ -193,13 +193,13 @@ class KleinanzeigenBot(SeleniumMixin):
# pylint: disable=cell-var-from-loop # pylint: disable=cell-var-from-loop
def assert_one_of(path:str, allowed:Iterable): def assert_one_of(path:str, allowed:Iterable):
ensure(safe_get(ad_cfg, *path.split(".")) in allowed, f'-> property [{path}] must be one of: {allowed} @ [{ad_file}]') ensure(safe_get(ad_cfg, *path.split(".")) in allowed, f"-> property [{path}] must be one of: {allowed} @ [{ad_file}]")
def assert_min_len(path:str, minlen:int): def assert_min_len(path:str, minlen:int):
ensure(len(safe_get(ad_cfg, *path.split("."))) >= minlen, f'-> property [{path}] must be at least {minlen} characters long @ [{ad_file}]') ensure(len(safe_get(ad_cfg, *path.split("."))) >= minlen, f"-> property [{path}] must be at least {minlen} characters long @ [{ad_file}]")
def assert_has_value(path:str): def assert_has_value(path:str):
ensure(safe_get(ad_cfg, *path.split(".")), f'-> property [{path}] not specified @ [{ad_file}]') ensure(safe_get(ad_cfg, *path.split(".")), f"-> property [{path}] not specified @ [{ad_file}]")
# pylint: enable=cell-var-from-loop # pylint: enable=cell-var-from-loop
assert_one_of("type", ("OFFER", "WANTED")) assert_one_of("type", ("OFFER", "WANTED"))
@@ -222,12 +222,12 @@ class KleinanzeigenBot(SeleniumMixin):
for image_pattern in ad_cfg["images"]: for image_pattern in ad_cfg["images"]:
for image_file in glob.glob(image_pattern, root_dir = os.path.dirname(ad_file), recursive = True): for image_file in glob.glob(image_pattern, root_dir = os.path.dirname(ad_file), recursive = True):
_, image_file_ext = os.path.splitext(image_file) _, image_file_ext = os.path.splitext(image_file)
ensure(image_file_ext.lower() in (".gif", ".jpg", ".jpeg", ".png"), f'Unsupported image file type [{image_file}]') ensure(image_file_ext.lower() in (".gif", ".jpg", ".jpeg", ".png"), f"Unsupported image file type [{image_file}]")
if os.path.isabs(image_file): if os.path.isabs(image_file):
images.add(image_file) images.add(image_file)
else: else:
images.add(os.path.join(os.path.dirname(ad_file), image_file)) images.add(os.path.join(os.path.dirname(ad_file), image_file))
ensure(images or not ad_cfg["images"], f'No images found for given file patterns {ad_cfg["images"]} at {os.getcwd()}') ensure(images or not ad_cfg["images"], f"No images found for given file patterns {ad_cfg['images']} at {os.getcwd()}")
ad_cfg["images"] = sorted(images) ad_cfg["images"] = sorted(images)
ads.append(( ads.append((
@@ -255,46 +255,46 @@ class KleinanzeigenBot(SeleniumMixin):
self.categories.update(self.config["categories"]) self.categories.update(self.config["categories"])
LOG.info(" -> found %s", pluralize("category", self.categories)) LOG.info(" -> found %s", pluralize("category", self.categories))
ensure(self.config["login"]["username"], f'[login.username] not specified @ [{self.config_file_path}]') ensure(self.config["login"]["username"], f"[login.username] not specified @ [{self.config_file_path}]")
ensure(self.config["login"]["password"], f'[login.password] not specified @ [{self.config_file_path}]') ensure(self.config["login"]["password"], f"[login.password] not specified @ [{self.config_file_path}]")
self.browser_arguments = self.config["browser"]["arguments"] self.browser_arguments = self.config["browser"]["arguments"]
self.browser_binary_location = self.config["browser"]["binary_location"] self.browser_binary_location = self.config["browser"]["binary_location"]
def login(self) -> None: def login(self) -> None:
LOG.info("Logging in as [%s]...", self.config["login"]["username"]) LOG.info("Logging in as [%s]...", self.config["login"]["username"])
self.web_open(f'{self.root_url}/m-einloggen.html') self.web_open(f"{self.root_url}/m-einloggen.html")
# accept privacy banner # accept privacy banner
self.web_click(By.ID, 'gdpr-banner-accept') self.web_click(By.ID, "gdpr-banner-accept")
self.web_input(By.ID, 'login-email', self.config["login"]["username"]) self.web_input(By.ID, "login-email", self.config["login"]["username"])
self.web_input(By.ID, 'login-password', self.config["login"]["password"]) self.web_input(By.ID, "login-password", self.config["login"]["password"])
self.handle_captcha_if_present("login-recaptcha", "but DON'T click 'Einloggen'.") self.handle_captcha_if_present("login-recaptcha", "but DON'T click 'Einloggen'.")
self.web_click(By.ID, 'login-submit') self.web_click(By.ID, "login-submit")
pause(800, 3000) pause(800, 3000)
def handle_captcha_if_present(self, captcha_element_id:str, msg:str) -> None: def handle_captcha_if_present(self, captcha_element_id:str, msg:str) -> None:
try: try:
self.web_click(By.XPATH, f'//*[@id="{captcha_element_id}"]') self.web_click(By.XPATH, f"//*[@id='{captcha_element_id}']")
except NoSuchElementException: except NoSuchElementException:
return return
LOG.warning("############################################") LOG.warning("############################################")
LOG.warning("# Captcha present! Please solve and close the captcha, %s", msg) LOG.warning("# Captcha present! Please solve and close the captcha, %s", msg)
LOG.warning("############################################") LOG.warning("############################################")
self.webdriver.switch_to.frame(self.web_find(By.CSS_SELECTOR, f'#{captcha_element_id} iframe')) self.webdriver.switch_to.frame(self.web_find(By.CSS_SELECTOR, f"#{captcha_element_id} iframe"))
self.web_await(lambda _: self.webdriver.find_element(By.ID, 'recaptcha-anchor').get_attribute('aria-checked') == "true", timeout = 5 * 60) self.web_await(lambda _: self.webdriver.find_element(By.ID, "recaptcha-anchor").get_attribute("aria-checked") == "true", timeout = 5 * 60)
self.webdriver.switch_to.default_content() self.webdriver.switch_to.default_content()
def delete_ad(self, ad_cfg: Dict[str, Any]) -> bool: def delete_ad(self, ad_cfg: Dict[str, Any]) -> bool:
LOG.info("Deleting ad '%s' if already present...", ad_cfg["title"]) LOG.info("Deleting ad '%s' if already present...", ad_cfg["title"])
self.web_open(f"{self.root_url}/m-meine-anzeigen.html") self.web_open(f"{self.root_url}/m-meine-anzeigen.html")
csrf_token_elem = self.web_find(By.XPATH, '//meta[@name="_csrf"]') csrf_token_elem = self.web_find(By.XPATH, "//meta[@name='_csrf']")
csrf_token = csrf_token_elem.get_attribute("content") csrf_token = csrf_token_elem.get_attribute("content")
published_ads = json.loads(self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT")["content"])["ads"] published_ads = json.loads(self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT")["content"])["ads"]
@@ -307,7 +307,7 @@ class KleinanzeigenBot(SeleniumMixin):
self.web_request( self.web_request(
url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={published_ad_id}", url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={published_ad_id}",
method = "POST", method = "POST",
headers = {'x-csrf-token': csrf_token} headers = {"x-csrf-token": csrf_token}
) )
pause(1500, 3000) pause(1500, 3000)
@@ -336,21 +336,21 @@ class KleinanzeigenBot(SeleniumMixin):
LOG.debug(" -> effective ad meta:") LOG.debug(" -> effective ad meta:")
YAML().dump(ad_cfg, sys.stdout) YAML().dump(ad_cfg, sys.stdout)
self.web_open(f'{self.root_url}/p-anzeige-aufgeben-schritt2.html') self.web_open(f"{self.root_url}/p-anzeige-aufgeben-schritt2.html")
if ad_cfg["type"] == "WANTED": if ad_cfg["type"] == "WANTED":
self.web_click(By.ID, 'adType2') self.web_click(By.ID, "adType2")
############################# #############################
# set title # set title
############################# #############################
self.web_input(By.ID, 'postad-title', ad_cfg["title"]) self.web_input(By.ID, "postad-title", ad_cfg["title"])
############################# #############################
# set category # set category
############################# #############################
# trigger and wait for automatic category detection # trigger and wait for automatic category detection
self.web_click(By.ID, 'pstad-price') self.web_click(By.ID, "pstad-price")
try: try:
self.web_find(By.XPATH, "//*[@id='postad-category-path'][text()]") self.web_find(By.XPATH, "//*[@id='postad-category-path'][text()]")
is_category_auto_selected = True is_category_auto_selected = True
@@ -358,21 +358,21 @@ class KleinanzeigenBot(SeleniumMixin):
is_category_auto_selected = False is_category_auto_selected = False
if ad_cfg["category"]: if ad_cfg["category"]:
self.web_click(By.ID, 'pstad-lnk-chngeCtgry') self.web_click(By.ID, "pstad-lnk-chngeCtgry")
self.web_find(By.ID, 'postad-step1-sbmt') self.web_find(By.ID, "postad-step1-sbmt")
category_url = f'{self.root_url}/p-kategorie-aendern.html#?path={ad_cfg["category"]}' category_url = f"{self.root_url}/p-kategorie-aendern.html#?path={ad_cfg['category']}"
self.web_open(category_url) self.web_open(category_url)
self.web_click(By.XPATH, "//*[@id='postad-step1-sbmt']/button") self.web_click(By.XPATH, "//*[@id='postad-step1-sbmt']/button")
else: else:
ensure(is_category_auto_selected, f'No category specified in [{ad_file}] and automatic category detection failed') ensure(is_category_auto_selected, f"No category specified in [{ad_file}] and automatic category detection failed")
############################# #############################
# set price # set price
############################# #############################
self.web_select(By.XPATH, "//select[@id='priceType']", ad_cfg["price_type"]) self.web_select(By.XPATH, "//select[@id='priceType']", ad_cfg["price_type"])
if ad_cfg["price_type"] != 'GIVE_AWAY': if ad_cfg["price_type"] != "GIVE_AWAY":
self.web_input(By.ID, 'pstad-price', ad_cfg["price"]) self.web_input(By.ID, "pstad-price", ad_cfg["price"])
############################# #############################
# set description # set description
@@ -383,25 +383,25 @@ class KleinanzeigenBot(SeleniumMixin):
# set contact zipcode # set contact zipcode
############################# #############################
if ad_cfg["contact"]["zipcode"]: if ad_cfg["contact"]["zipcode"]:
self.web_input(By.ID, 'pstad-zip', ad_cfg["contact"]["zipcode"]) self.web_input(By.ID, "pstad-zip", ad_cfg["contact"]["zipcode"])
############################# #############################
# set contact street # set contact street
############################# #############################
if ad_cfg["contact"]["street"]: if ad_cfg["contact"]["street"]:
self.web_input(By.ID, 'pstad-street', ad_cfg["contact"]["street"]) self.web_input(By.ID, "pstad-street", ad_cfg["contact"]["street"])
############################# #############################
# set contact name # set contact name
############################# #############################
if ad_cfg["contact"]["name"]: if ad_cfg["contact"]["name"]:
self.web_input(By.ID, 'postad-contactname', ad_cfg["contact"]["name"]) self.web_input(By.ID, "postad-contactname", ad_cfg["contact"]["name"])
############################# #############################
# set contact phone # set contact phone
############################# #############################
if ad_cfg["contact"]["phone"]: if ad_cfg["contact"]["phone"]:
self.web_input(By.ID, 'postad-phonenumber', ad_cfg["contact"]["phone"]) self.web_input(By.ID, "postad-phonenumber", ad_cfg["contact"]["phone"])
############################# #############################
# upload images # upload images
@@ -418,7 +418,7 @@ class KleinanzeigenBot(SeleniumMixin):
image_upload.send_keys(image) image_upload.send_keys(image)
start_at = time.time() start_at = time.time()
while previous_uploaded_images_count == count_uploaded_images() and time.time() - start_at < 60: while previous_uploaded_images_count == count_uploaded_images() and time.time() - start_at < 60:
print(".", end = '', flush = True) print(".", end = "", flush = True)
time.sleep(1) time.sleep(1)
print(flush = True) print(flush = True)
@@ -429,7 +429,7 @@ class KleinanzeigenBot(SeleniumMixin):
# submit # submit
############################# #############################
self.handle_captcha_if_present("postAd-recaptcha", "but DON'T click 'Anzeige aufgeben'.") self.handle_captcha_if_present("postAd-recaptcha", "but DON'T click 'Anzeige aufgeben'.")
self.web_click(By.ID, 'pstad-submit') self.web_click(By.ID, "pstad-submit")
self.web_await(EC.url_contains("p-anzeige-aufgeben-bestaetigung.html?adId="), 20) self.web_await(EC.url_contains("p-anzeige-aufgeben-bestaetigung.html?adId="), 20)
ad_cfg_orig["updated_on"] = datetime.utcnow().isoformat() ad_cfg_orig["updated_on"] = datetime.utcnow().isoformat()
@@ -438,7 +438,7 @@ class KleinanzeigenBot(SeleniumMixin):
# extract the ad id from the URL's query parameter # extract the ad id from the URL's query parameter
current_url_query_params = urllib.parse.parse_qs(urllib.parse.urlparse(self.webdriver.current_url).query) current_url_query_params = urllib.parse.parse_qs(urllib.parse.urlparse(self.webdriver.current_url).query)
ad_id = int(current_url_query_params.get('adId', None)[0]) ad_id = int(current_url_query_params.get("adId", None)[0])
ad_cfg_orig["id"] = ad_id ad_cfg_orig["id"] = ad_id
LOG.info(" -> SUCCESS: ad published with ID %s", ad_id) LOG.info(" -> SUCCESS: ad published with ID %s", ad_id)
@@ -470,7 +470,7 @@ def main(args:Iterable[str]):
KleinanzeigenBot().run(args) KleinanzeigenBot().run(args)
if __name__ == '__main__': if __name__ == "__main__":
utils.configure_console_logging() utils.configure_console_logging()
LOG.error("Direct execution not supported. Use 'python -m kleinanzeigen_bot'") LOG.error("Direct execution not supported. Use 'python -m kleinanzeigen_bot'")
sys.exit(1) sys.exit(1)

View File

@@ -49,8 +49,8 @@ class SeleniumMixin:
LOG.info(" -> Custom chrome argument: %s", chrome_option) LOG.info(" -> Custom chrome argument: %s", chrome_option)
browser_options.add_argument(chrome_option) browser_options.add_argument(chrome_option)
browser_options.add_experimental_option('excludeSwitches', ['enable-automation']) browser_options.add_experimental_option("excludeSwitches", ["enable-automation"])
browser_options.add_experimental_option('useAutomationExtension', False) browser_options.add_experimental_option("useAutomationExtension", False)
browser_options.add_experimental_option("prefs", { browser_options.add_experimental_option("prefs", {
"credentials_enable_service": False, "credentials_enable_service": False,
"profile.password_manager_enabled": False, "profile.password_manager_enabled": False,

View File

@@ -27,7 +27,7 @@ def is_frozen() -> bool:
>>> is_frozen() >>> is_frozen()
False False
""" """
return getattr(sys, 'frozen', False) return getattr(sys, "frozen", False)
def apply_defaults(target:Dict[Any, Any], defaults:Dict[Any, Any], ignore = lambda _k, _v: False, override = lambda _k, _v: False) -> Dict[Any, Any]: def apply_defaults(target:Dict[Any, Any], defaults:Dict[Any, Any], ignore = lambda _k, _v: False, override = lambda _k, _v: False) -> Dict[Any, Any]:
@@ -76,7 +76,7 @@ def safe_get(a_map:Dict[Any, Any], *keys:str) -> Any:
def configure_console_logging() -> None: def configure_console_logging() -> None:
stdout_log = logging.StreamHandler(sys.stderr) stdout_log = logging.StreamHandler(sys.stderr)
stdout_log.setLevel(logging.DEBUG) stdout_log.setLevel(logging.DEBUG)
stdout_log.setFormatter(coloredlogs.ColoredFormatter('[%(levelname)s] %(message)s')) stdout_log.setFormatter(coloredlogs.ColoredFormatter("[%(levelname)s] %(message)s"))
stdout_log.addFilter(type("", (logging.Filter,), { stdout_log.addFilter(type("", (logging.Filter,), {
"filter": lambda rec: rec.levelno <= logging.INFO "filter": lambda rec: rec.levelno <= logging.INFO
})) }))
@@ -84,7 +84,7 @@ def configure_console_logging() -> None:
stderr_log = logging.StreamHandler(sys.stderr) stderr_log = logging.StreamHandler(sys.stderr)
stderr_log.setLevel(logging.WARNING) stderr_log.setLevel(logging.WARNING)
stderr_log.setFormatter(coloredlogs.ColoredFormatter('[%(levelname)s] %(message)s')) stderr_log.setFormatter(coloredlogs.ColoredFormatter("[%(levelname)s] %(message)s"))
LOG_ROOT.addHandler(stderr_log) LOG_ROOT.addHandler(stderr_log)
@@ -106,7 +106,7 @@ def on_exit() -> None:
def on_sigint(_sig:int, _frame) -> None: def on_sigint(_sig:int, _frame) -> None:
LOG.warning('Aborted on user request.') LOG.warning("Aborted on user request.")
sys.exit(0) sys.exit(0)
@@ -134,7 +134,7 @@ def pluralize(word:str, count:Union[int, Iterable], prefix = True):
count = len(count) count = len(count)
plural = pluralize.inflect.plural_noun(word, count) plural = pluralize.inflect.plural_noun(word, count)
if prefix: if prefix:
return f'{count} {plural}' return f"{count} {plural}"
return plural return plural