From 414df7736c048fc0b08c2f015d15d24baaa04ca6 Mon Sep 17 00:00:00 2001 From: sebthom Date: Sun, 13 Mar 2022 13:25:52 +0100 Subject: [PATCH] Improving type hints --- kleinanzeigen_bot/__init__.py | 32 +++---- kleinanzeigen_bot/selenium_mixin.py | 127 ++++++++++++++-------------- kleinanzeigen_bot/utils.py | 61 +++++++------ pyproject.toml | 16 +++- 4 files changed, 132 insertions(+), 104 deletions(-) diff --git a/kleinanzeigen_bot/__init__.py b/kleinanzeigen_bot/__init__.py index 3974a03..389c60b 100644 --- a/kleinanzeigen_bot/__init__.py +++ b/kleinanzeigen_bot/__init__.py @@ -26,7 +26,7 @@ LOG.setLevel(logging.INFO) class KleinanzeigenBot(SeleniumMixin): - def __init__(self): + def __init__(self) -> None: super().__init__() self.root_url = "https://www.ebay-kleinanzeigen.de" @@ -36,25 +36,25 @@ class KleinanzeigenBot(SeleniumMixin): self.categories:dict[str, str] = {} - self.file_log:logging.FileHandler = None + self.file_log:logging.FileHandler | None = None if is_frozen(): log_file_basename = os.path.splitext(os.path.basename(sys.executable))[0] else: log_file_basename = self.__module__ - self.log_file_path = abspath(f"{log_file_basename}.log") + self.log_file_path:str | None = abspath(f"{log_file_basename}.log") self.command = "help" self.ads_selector = "due" self.delete_old_ads = True - def __del__(self): + def __del__(self) -> None: if self.file_log: LOG_ROOT.removeHandler(self.file_log) def get_version(self) -> str: return importlib.metadata.version(__package__) - def run(self, args:Iterable[str]) -> None: + def run(self, args:list[str]) -> None: self.parse_args(args) match self.command: case "help": @@ -115,7 +115,7 @@ class KleinanzeigenBot(SeleniumMixin): -v, --verbose - enables verbose output - only useful when troubleshooting issues """)) - def parse_args(self, args:Iterable[str]) -> None: + def parse_args(self, args:list[str]) -> None: try: options, arguments = getopt.gnu_getopt(args[1:], "hv", [ "ads=", @@ -175,7 +175,7 @@ class KleinanzeigenBot(SeleniumMixin): LOG.info("App version: %s", self.get_version()) - def load_ads(self, *, ignore_inactive = True) -> Iterable[dict[str, Any]]: + def load_ads(self, *, ignore_inactive:bool = True) -> list[tuple[str, dict[str, Any], dict[str, Any]]]: LOG.info("Searching for ad config files...") ad_files = set() @@ -228,13 +228,13 @@ class KleinanzeigenBot(SeleniumMixin): ad_cfg["description"] = descr_prefix + (ad_cfg["description"] or "") + descr_suffix # pylint: disable=cell-var-from-loop - def assert_one_of(path:str, allowed:Iterable): + def assert_one_of(path:str, allowed:Iterable[str]) -> None: ensure(safe_get(ad_cfg, *path.split(".")) in allowed, f"-> property [{path}] must be one of: {allowed} @ [{ad_file}]") - def assert_min_len(path:str, minlen:int): + def assert_min_len(path:str, minlen:int) -> None: ensure(len(safe_get(ad_cfg, *path.split("."))) >= minlen, f"-> property [{path}] must be at least {minlen} characters long @ [{ad_file}]") - def assert_has_value(path:str): + def assert_has_value(path:str) -> None: ensure(safe_get(ad_cfg, *path.split(".")), f"-> property [{path}] not specified @ [{ad_file}]") # pylint: enable=cell-var-from-loop @@ -281,7 +281,7 @@ class KleinanzeigenBot(SeleniumMixin): def load_config(self) -> None: config_defaults = utils.load_dict_from_module(resources, "config_defaults.yaml") - config = utils.load_dict(self.config_file_path, "config", must_exist = False) + config = utils.load_dict_if_exists(self.config_file_path, "config") if config is None: LOG.warning("Config file %s does not exist. Creating it with default values...", self.config_file_path) @@ -359,7 +359,7 @@ class KleinanzeigenBot(SeleniumMixin): ad_cfg["id"] = None return True - def publish_ads(self, ad_cfgs:Iterable[dict[str, Any]]) -> None: + def publish_ads(self, ad_cfgs:list[tuple[str, dict[str, Any], dict[str, Any]]]) -> None: count = 0 for (ad_file, ad_cfg, ad_cfg_orig) in ad_cfgs: @@ -372,7 +372,7 @@ class KleinanzeigenBot(SeleniumMixin): LOG.info("DONE: (Re-)published %s", pluralize("ad", count)) LOG.info("############################################") - def publish_ad(self, ad_file, ad_cfg: dict[str, Any], ad_cfg_orig: dict[str, Any]) -> None: + def publish_ad(self, ad_file:str, ad_cfg: dict[str, Any], ad_cfg_orig: dict[str, Any]) -> None: if self.delete_old_ads: self.delete_ad(ad_cfg) @@ -489,7 +489,7 @@ class KleinanzeigenBot(SeleniumMixin): LOG.info(" -> found %s", pluralize("image", ad_cfg["images"])) image_upload = self.web_find(By.XPATH, "//input[@type='file']") - def count_uploaded_images(): + def count_uploaded_images() -> int: return len(self.webdriver.find_elements(By.CLASS_NAME, "imagebox-new-thumbnail")) for image in ad_cfg["images"]: @@ -527,7 +527,7 @@ class KleinanzeigenBot(SeleniumMixin): utils.save_dict(ad_file, ad_cfg_orig) @overrides - def web_open(self, url:str, timeout:float = 15, reload_if_already_open = False) -> None: + def web_open(self, url:str, timeout:float = 15, reload_if_already_open:bool = False) -> None: start_at = time.time() super().web_open(url, timeout, reload_if_already_open) pause(2000) @@ -548,7 +548,7 @@ class KleinanzeigenBot(SeleniumMixin): ############################# # main entry point ############################# -def main(args:Iterable[str]): +def main(args:list[str]) -> None: if "version" not in args: print(textwrap.dedent(r""" _ _ _ _ _ _ diff --git a/kleinanzeigen_bot/selenium_mixin.py b/kleinanzeigen_bot/selenium_mixin.py index 5f7b31f..0ef8774 100644 --- a/kleinanzeigen_bot/selenium_mixin.py +++ b/kleinanzeigen_bot/selenium_mixin.py @@ -30,9 +30,9 @@ LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.selenium_mixin" class BrowserConfig: - def __init__(self): + def __init__(self) -> None: self.arguments:Iterable[str] = [] - self.binary_location:str = None + self.binary_location:str | None = None self.extensions:Iterable[str] = [] self.use_private_window:bool = True self.user_data_dir:str = "" @@ -41,74 +41,77 @@ class BrowserConfig: class SeleniumMixin: - def __init__(self): + def __init__(self) -> None: self.browser_config:Final[BrowserConfig] = BrowserConfig() self.webdriver:WebDriver = None + def _init_browser_options(self, browser_options:ChromiumOptions) -> ChromiumOptions: + if self.browser_config.use_private_window: + if isinstance(browser_options, webdriver.EdgeOptions): + browser_options.add_argument("-inprivate") + else: + browser_options.add_argument("--incognito") + + if self.browser_config.user_data_dir: + LOG.info(" -> Browser User Data Dir: %s", self.browser_config.user_data_dir) + browser_options.add_argument(f"--user-data-dir={self.browser_config.user_data_dir}") + + if self.browser_config.profile_name: + LOG.info(" -> Browser Profile Name: %s", self.browser_config.profile_name) + browser_options.add_argument(f"--profile-directory={self.browser_config.profile_name}") + + browser_options.add_argument("--disable-crash-reporter") + browser_options.add_argument("--no-first-run") + browser_options.add_argument("--no-service-autorun") + for chrome_option in self.browser_config.arguments: + LOG.info(" -> Custom chrome argument: %s", chrome_option) + browser_options.add_argument(chrome_option) + LOG.debug("Effective browser arguments: %s", browser_options.arguments) + + for crx_extension in self.browser_config.extensions: + ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.") + browser_options.add_extension(crx_extension) + LOG.debug("Effective browser extensions: %s", browser_options.extensions) + + browser_options.add_experimental_option("excludeSwitches", ["enable-automation"]) + browser_options.add_experimental_option("useAutomationExtension", False) + browser_options.add_experimental_option("prefs", { + "credentials_enable_service": False, + "profile.password_manager_enabled": False, + "profile.default_content_setting_values.notifications": 2, # 1 = allow, 2 = block browser notifications + "devtools.preferences.currentDockState": "\"bottom\"" + }) + + if not LOG.isEnabledFor(logging.DEBUG): + browser_options.add_argument("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3 + + LOG.debug("Effective experimental options: %s", browser_options.experimental_options) + + if self.browser_config.binary_location: + browser_options.binary_location = self.browser_config.binary_location + LOG.info(" -> Chrome binary location: %s", self.browser_config.binary_location) + return browser_options + def create_webdriver_session(self) -> None: LOG.info("Creating WebDriver session...") - def init_browser_options(browser_options:ChromiumOptions): - if self.browser_config.use_private_window: - if isinstance(browser_options, webdriver.EdgeOptions): - browser_options.add_argument("-inprivate") - else: - browser_options.add_argument("--incognito") - - if self.browser_config.user_data_dir: - LOG.info(" -> Browser User Data Dir: %s", self.browser_config.user_data_dir) - browser_options.add_argument(f"--user-data-dir={self.browser_config.user_data_dir}") - - if self.browser_config.profile_name: - LOG.info(" -> Browser Profile Name: %s", self.browser_config.profile_name) - browser_options.add_argument(f"--profile-directory={self.browser_config.profile_name}") - - browser_options.add_argument("--disable-crash-reporter") - browser_options.add_argument("--no-first-run") - browser_options.add_argument("--no-service-autorun") - for chrome_option in self.browser_config.arguments: - LOG.info(" -> Custom chrome argument: %s", chrome_option) - browser_options.add_argument(chrome_option) - LOG.debug("Effective browser arguments: %s", browser_options.arguments) - - for crx_extension in self.browser_config.extensions: - ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.") - browser_options.add_extension(crx_extension) - LOG.debug("Effective browser extensions: %s", browser_options.extensions) - - browser_options.add_experimental_option("excludeSwitches", ["enable-automation"]) - browser_options.add_experimental_option("useAutomationExtension", False) - browser_options.add_experimental_option("prefs", { - "credentials_enable_service": False, - "profile.password_manager_enabled": False, - "profile.default_content_setting_values.notifications": 2, # 1 = allow, 2 = block browser notifications - "devtools.preferences.currentDockState": "\"bottom\"" - }) - - if not LOG.isEnabledFor(logging.DEBUG): - browser_options.add_argument("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3 - - LOG.debug("Effective experimental options: %s", browser_options.experimental_options) - - if self.browser_config.binary_location: - browser_options.binary_location = self.browser_config.binary_location - LOG.info(" -> Chrome binary location: %s", self.browser_config.binary_location) - return browser_options - if not LOG.isEnabledFor(logging.DEBUG): os.environ['WDM_LOG_LEVEL'] = '0' # silence the web driver manager # check if a chrome driver is present already if shutil.which(DEFAULT_CHROMEDRIVER_PATH): - self.webdriver = webdriver.Chrome(options = init_browser_options(webdriver.ChromeOptions())) + self.webdriver = webdriver.Chrome(options = self._init_browser_options(webdriver.ChromeOptions())) elif shutil.which(DEFAULT_EDGEDRIVER_PATH): - self.webdriver = webdriver.ChromiumEdge(options = init_browser_options(webdriver.EdgeOptions())) + self.webdriver = webdriver.ChromiumEdge(options = self._init_browser_options(webdriver.EdgeOptions())) else: # determine browser major version if self.browser_config.binary_location: chrome_type, chrome_version = self.get_browser_version(self.browser_config.binary_location) else: - chrome_type, chrome_version = self.get_browser_version_from_os() + browser_info = self.get_browser_version_from_os() + if browser_info is None: + raise AssertionError("No supported browser found!") + chrome_type, chrome_version = browser_info chrome_major_version = chrome_version.split(".", 1)[0] # download and install matching chrome driver @@ -120,13 +123,13 @@ class SeleniumMixin: env["MSEDGEDRIVER_TELEMETRY_OPTOUT"] = "1" # https://docs.microsoft.com/en-us/microsoft-edge/privacy-whitepaper/#microsoft-edge-driver self.webdriver = webdriver.ChromiumEdge( service = EdgeService(webdriver_path, env = env), - options = init_browser_options(webdriver.EdgeOptions()) + options = self._init_browser_options(webdriver.EdgeOptions()) ) else: webdriver_mgr = ChromeDriverManager(chrome_type = chrome_type, cache_valid_range = 14) webdriver_mgr.driver.browser_version = chrome_major_version webdriver_path = webdriver_mgr.install() - self.webdriver = webdriver.Chrome(service = ChromeService(webdriver_path), options = init_browser_options(webdriver.ChromeOptions())) + self.webdriver = webdriver.Chrome(service = ChromeService(webdriver_path), options = self._init_browser_options(webdriver.ChromeOptions())) # workaround to support Edge, see https://github.com/diprajpatra/selenium-stealth/pull/25 selenium_stealth.Driver = ChromiumDriver @@ -168,7 +171,7 @@ class SeleniumMixin: return (ChromeType.MSEDGE, version) return (ChromeType.GOOGLE, version) - def get_browser_version_from_os(self) -> tuple[ChromeType, str]: + def get_browser_version_from_os(self) -> tuple[ChromeType, str] | None: version = ChromeDriverManagerUtils.get_browser_version_from_os(ChromeType.CHROMIUM) if version != "UNKNOWN": return (ChromeType.CHROMIUM, version) @@ -184,9 +187,9 @@ class SeleniumMixin: return (ChromeType.MSEDGE, version) LOG.debug("Microsoft Edge not found") - return (None, None) + return None - def web_await(self, condition: Callable[[WebDriver], T], timeout:float = 5, exception_on_timeout: Callable[[], Exception] = None) -> T: + def web_await(self, condition: Callable[[WebDriver], T], timeout:float = 5, exception_on_timeout: Callable[[], Exception] | None = None) -> T: """ Blocks/waits until the given condition is met. @@ -194,7 +197,7 @@ class SeleniumMixin: :raises TimeoutException: if element could not be found within time """ try: - return WebDriverWait(self.webdriver, timeout).until(condition) + return WebDriverWait(self.webdriver, timeout).until(condition) # type: ignore[no-any-return] except TimeoutException as ex: if exception_on_timeout: raise exception_on_timeout() from ex @@ -247,7 +250,7 @@ class SeleniumMixin: input_field.send_keys(text) pause() - def web_open(self, url:str, timeout:float = 15, reload_if_already_open = False) -> None: + def web_open(self, url:str, timeout:float = 15, reload_if_already_open:bool = False) -> None: """ :param url: url to open in browser :param timeout: timespan in seconds within the page needs to be loaded @@ -262,10 +265,10 @@ class SeleniumMixin: WebDriverWait(self.webdriver, timeout).until(lambda _: self.web_execute("return document.readyState") == "complete") # pylint: disable=dangerous-default-value - def web_request(self, url:str, method:str = "GET", valid_response_codes:Iterable[int] = [200], headers:dict[str, str] = None) -> dict[str, Any]: + def web_request(self, url:str, method:str = "GET", valid_response_codes:Iterable[int] = [200], headers:dict[str, str] | None = None) -> dict[str, Any]: method = method.upper() LOG.debug(" -> HTTP %s [%s]...", method, url) - response = self.webdriver.execute_async_script(f""" + response:dict[str, Any] = self.webdriver.execute_async_script(f""" var callback = arguments[arguments.length - 1]; fetch("{url}", {{ method: "{method}", diff --git a/kleinanzeigen_bot/utils.py b/kleinanzeigen_bot/utils.py index 84d4858..07ca892 100644 --- a/kleinanzeigen_bot/utils.py +++ b/kleinanzeigen_bot/utils.py @@ -4,8 +4,8 @@ SPDX-License-Identifier: AGPL-3.0-or-later """ import copy, decimal, json, logging, os, re, secrets, sys, traceback, time from importlib.resources import read_text as get_resource_as_string -from collections.abc import Callable, Iterable -from types import ModuleType +from collections.abc import Callable, Sized +from types import FrameType, ModuleType, TracebackType from typing import Any, Final, TypeVar import coloredlogs, inflect @@ -14,10 +14,11 @@ from ruamel.yaml import YAML LOG_ROOT:Final[logging.Logger] = logging.getLogger() LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.utils") -T:Final[TypeVar] = TypeVar('T') +# https://mypy.readthedocs.io/en/stable/generics.html#generic-functions +T = TypeVar('T') -def abspath(relative_path:str, relative_to:str = None): +def abspath(relative_path:str, relative_to:str | None = None) -> str: """ Makes a given relative path absolute based on another file/folder """ @@ -33,13 +34,13 @@ def abspath(relative_path:str, relative_to:str = None): return os.path.normpath(os.path.join(relative_to, relative_path)) -def ensure(condition:bool | Callable[[], bool], error_message:str, timeout:float = 5, poll_requency:float = 0.5) -> None: +def ensure(condition:Any | bool | Callable[[], bool], error_message:str, timeout:float = 5, poll_requency:float = 0.5) -> None: """ :param timeout: timespan in seconds until when the condition must become `True`, default is 5 seconds :param poll_requency: sleep interval between calls in seconds, default is 0.5 seconds :raises AssertionError: if condition did not come `True` within given timespan """ - if not isinstance(condition, Callable): + if not isinstance(condition, Callable): # type: ignore[arg-type] # https://github.com/python/mypy/issues/6864 if condition: return raise AssertionError(error_message) @@ -50,7 +51,7 @@ def ensure(condition:bool | Callable[[], bool], error_message:str, timeout:float raise AssertionError("[poll_requency] must be >= 0") start_at = time.time() - while not condition(): + while not condition(): # type: ignore[operator] elapsed = time.time() - start_at if elapsed >= timeout: raise AssertionError(error_message) @@ -65,7 +66,12 @@ def is_frozen() -> bool: return getattr(sys, "frozen", False) -def apply_defaults(target:dict[Any, Any], defaults:dict[Any, Any], ignore = lambda _k, _v: False, override = lambda _k, _v: False) -> dict[Any, Any]: +def apply_defaults( + target:dict[Any, Any], + defaults:dict[Any, Any], + ignore:Callable[[Any, Any], bool] = lambda _k, _v: False, + override:Callable[[Any, Any], bool] = lambda _k, _v: False +) -> dict[Any, Any]: """ >>> apply_defaults({}, {"foo": "bar"}) {'foo': 'bar'} @@ -122,7 +128,7 @@ def configure_console_logging() -> None: LOG_ROOT.addHandler(stderr_log) -def on_exception(ex_type, ex_value, ex_traceback) -> None: +def on_exception(ex_type:type[BaseException], ex_value:Any, ex_traceback:TracebackType | None) -> None: if issubclass(ex_type, KeyboardInterrupt): sys.__excepthook__(ex_type, ex_value, ex_traceback) elif LOG.isEnabledFor(logging.DEBUG) or isinstance(ex_value, (AttributeError, ImportError, NameError, TypeError)): @@ -138,7 +144,7 @@ def on_exit() -> None: handler.flush() -def on_sigint(_sig:int, _frame) -> None: +def on_sigint(_sig:int, _frame:FrameType | None) -> None: LOG.warning("Aborted on user request.") sys.exit(0) @@ -152,7 +158,7 @@ def pause(min_ms:int = 200, max_ms:int = 2000) -> None: time.sleep(duration / 1000) -def pluralize(word:str, count:int | Iterable, prefix = True): +def pluralize(word:str, count:int | Sized, prefix:bool = True) -> str: """ >>> pluralize("field", 1) '1 field' @@ -163,15 +169,25 @@ def pluralize(word:str, count:int | Iterable, prefix = True): """ if not hasattr(pluralize, "inflect"): pluralize.inflect = inflect.engine() - if isinstance(count, Iterable): + if isinstance(count, Sized): count = len(count) - plural = pluralize.inflect.plural_noun(word, count) + plural:str = pluralize.inflect.plural_noun(word, count) if prefix: return f"{count} {plural}" return plural -def load_dict(filepath:str, content_label:str = "", must_exist = True) -> dict[str, Any] | None: +def load_dict(filepath:str, content_label:str = "") -> dict[str, Any]: + """ + :raises FileNotFoundError + """ + data = load_dict_if_exists(filepath, content_label) + if data is None: + raise FileNotFoundError(filepath) + return data + + +def load_dict_if_exists(filepath:str, content_label:str = "") -> dict[str, Any] | None: filepath = os.path.abspath(filepath) LOG.info("Loading %s[%s]...", content_label and content_label + " from " or "", filepath) @@ -180,28 +196,23 @@ def load_dict(filepath:str, content_label:str = "", must_exist = True) -> dict[s raise ValueError(f'Unsupported file type. The file name "{filepath}" must end with *.json, *.yaml, or *.yml') if not os.path.exists(filepath): - if must_exist: - raise FileNotFoundError(filepath) return None with open(filepath, encoding = "utf-8") as file: return json.load(file) if filepath.endswith(".json") else YAML().load(file) -def load_dict_from_module(module:ModuleType, filename:str, content_label:str = "", must_exist = True) -> dict[str, Any] | None: +def load_dict_from_module(module:ModuleType, filename:str, content_label:str = "") -> dict[str, Any]: + """ + :raises FileNotFoundError + """ LOG.debug("Loading %s[%s.%s]...", content_label and content_label + " from " or "", module.__name__, filename) _, file_ext = os.path.splitext(filename) - if file_ext not in [".json", ".yaml", ".yml"]: + if file_ext not in (".json", ".yaml", ".yml"): raise ValueError(f'Unsupported file type. The file name "{filename}" must end with *.json, *.yaml, or *.yml') - try: - content = get_resource_as_string(module, filename) - except FileNotFoundError as ex: - if must_exist: - raise ex - return None - + content = get_resource_as_string(module, filename) return json.loads(content) if filename.endswith(".json") else YAML().load(content) diff --git a/pyproject.toml b/pyproject.toml index baa402a..3bcfe2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,6 +103,20 @@ aggressive = 3 [tool.bandit] +##################### +# mypy +# https://github.com/python/mypy +##################### +[tool.mypy] +python_version = "3.10" +strict = true +disallow_untyped_defs = true +disallow_incomplete_defs = true +ignore_missing_imports = true +show_error_codes = true +warn_unused_ignores = true + + ##################### # pylint # https://pypi.org/project/pylint/ @@ -128,7 +142,7 @@ load-plugins = [ ] [tool.pylint.basic] -good-names = ["i", "j", "k", "v", "by", "ex", "fd", "_"] +good-names = ["i", "j", "k", "v", "by", "ex", "fd", "_", "T"] [tool.pylint.format] # https://pylint.pycqa.org/en/latest/technical_reference/features.html#format-checker