Improving type hints

This commit is contained in:
sebthom
2022-03-13 13:25:52 +01:00
parent 9d35d3c2ac
commit 414df7736c
4 changed files with 132 additions and 104 deletions

View File

@@ -26,7 +26,7 @@ LOG.setLevel(logging.INFO)
class KleinanzeigenBot(SeleniumMixin):
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.root_url = "https://www.ebay-kleinanzeigen.de"
@@ -36,25 +36,25 @@ class KleinanzeigenBot(SeleniumMixin):
self.categories:dict[str, str] = {}
self.file_log:logging.FileHandler = None
self.file_log:logging.FileHandler | None = None
if is_frozen():
log_file_basename = os.path.splitext(os.path.basename(sys.executable))[0]
else:
log_file_basename = self.__module__
self.log_file_path = abspath(f"{log_file_basename}.log")
self.log_file_path:str | None = abspath(f"{log_file_basename}.log")
self.command = "help"
self.ads_selector = "due"
self.delete_old_ads = True
def __del__(self):
def __del__(self) -> None:
if self.file_log:
LOG_ROOT.removeHandler(self.file_log)
def get_version(self) -> str:
return importlib.metadata.version(__package__)
def run(self, args:Iterable[str]) -> None:
def run(self, args:list[str]) -> None:
self.parse_args(args)
match self.command:
case "help":
@@ -115,7 +115,7 @@ class KleinanzeigenBot(SeleniumMixin):
-v, --verbose - enables verbose output - only useful when troubleshooting issues
"""))
def parse_args(self, args:Iterable[str]) -> None:
def parse_args(self, args:list[str]) -> None:
try:
options, arguments = getopt.gnu_getopt(args[1:], "hv", [
"ads=",
@@ -175,7 +175,7 @@ class KleinanzeigenBot(SeleniumMixin):
LOG.info("App version: %s", self.get_version())
def load_ads(self, *, ignore_inactive = True) -> Iterable[dict[str, Any]]:
def load_ads(self, *, ignore_inactive:bool = True) -> list[tuple[str, dict[str, Any], dict[str, Any]]]:
LOG.info("Searching for ad config files...")
ad_files = set()
@@ -228,13 +228,13 @@ class KleinanzeigenBot(SeleniumMixin):
ad_cfg["description"] = descr_prefix + (ad_cfg["description"] or "") + descr_suffix
# pylint: disable=cell-var-from-loop
def assert_one_of(path:str, allowed:Iterable):
def assert_one_of(path:str, allowed:Iterable[str]) -> None:
ensure(safe_get(ad_cfg, *path.split(".")) in allowed, f"-> property [{path}] must be one of: {allowed} @ [{ad_file}]")
def assert_min_len(path:str, minlen:int):
def assert_min_len(path:str, minlen:int) -> None:
ensure(len(safe_get(ad_cfg, *path.split("."))) >= minlen, f"-> property [{path}] must be at least {minlen} characters long @ [{ad_file}]")
def assert_has_value(path:str):
def assert_has_value(path:str) -> None:
ensure(safe_get(ad_cfg, *path.split(".")), f"-> property [{path}] not specified @ [{ad_file}]")
# pylint: enable=cell-var-from-loop
@@ -281,7 +281,7 @@ class KleinanzeigenBot(SeleniumMixin):
def load_config(self) -> None:
config_defaults = utils.load_dict_from_module(resources, "config_defaults.yaml")
config = utils.load_dict(self.config_file_path, "config", must_exist = False)
config = utils.load_dict_if_exists(self.config_file_path, "config")
if config is None:
LOG.warning("Config file %s does not exist. Creating it with default values...", self.config_file_path)
@@ -359,7 +359,7 @@ class KleinanzeigenBot(SeleniumMixin):
ad_cfg["id"] = None
return True
def publish_ads(self, ad_cfgs:Iterable[dict[str, Any]]) -> None:
def publish_ads(self, ad_cfgs:list[tuple[str, dict[str, Any], dict[str, Any]]]) -> None:
count = 0
for (ad_file, ad_cfg, ad_cfg_orig) in ad_cfgs:
@@ -372,7 +372,7 @@ class KleinanzeigenBot(SeleniumMixin):
LOG.info("DONE: (Re-)published %s", pluralize("ad", count))
LOG.info("############################################")
def publish_ad(self, ad_file, ad_cfg: dict[str, Any], ad_cfg_orig: dict[str, Any]) -> None:
def publish_ad(self, ad_file:str, ad_cfg: dict[str, Any], ad_cfg_orig: dict[str, Any]) -> None:
if self.delete_old_ads:
self.delete_ad(ad_cfg)
@@ -489,7 +489,7 @@ class KleinanzeigenBot(SeleniumMixin):
LOG.info(" -> found %s", pluralize("image", ad_cfg["images"]))
image_upload = self.web_find(By.XPATH, "//input[@type='file']")
def count_uploaded_images():
def count_uploaded_images() -> int:
return len(self.webdriver.find_elements(By.CLASS_NAME, "imagebox-new-thumbnail"))
for image in ad_cfg["images"]:
@@ -527,7 +527,7 @@ class KleinanzeigenBot(SeleniumMixin):
utils.save_dict(ad_file, ad_cfg_orig)
@overrides
def web_open(self, url:str, timeout:float = 15, reload_if_already_open = False) -> None:
def web_open(self, url:str, timeout:float = 15, reload_if_already_open:bool = False) -> None:
start_at = time.time()
super().web_open(url, timeout, reload_if_already_open)
pause(2000)
@@ -548,7 +548,7 @@ class KleinanzeigenBot(SeleniumMixin):
#############################
# main entry point
#############################
def main(args:Iterable[str]):
def main(args:list[str]) -> None:
if "version" not in args:
print(textwrap.dedent(r"""
_ _ _ _ _ _

View File

@@ -30,9 +30,9 @@ LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.selenium_mixin"
class BrowserConfig:
def __init__(self):
def __init__(self) -> None:
self.arguments:Iterable[str] = []
self.binary_location:str = None
self.binary_location:str | None = None
self.extensions:Iterable[str] = []
self.use_private_window:bool = True
self.user_data_dir:str = ""
@@ -41,74 +41,77 @@ class BrowserConfig:
class SeleniumMixin:
def __init__(self):
def __init__(self) -> None:
self.browser_config:Final[BrowserConfig] = BrowserConfig()
self.webdriver:WebDriver = None
def _init_browser_options(self, browser_options:ChromiumOptions) -> ChromiumOptions:
if self.browser_config.use_private_window:
if isinstance(browser_options, webdriver.EdgeOptions):
browser_options.add_argument("-inprivate")
else:
browser_options.add_argument("--incognito")
if self.browser_config.user_data_dir:
LOG.info(" -> Browser User Data Dir: %s", self.browser_config.user_data_dir)
browser_options.add_argument(f"--user-data-dir={self.browser_config.user_data_dir}")
if self.browser_config.profile_name:
LOG.info(" -> Browser Profile Name: %s", self.browser_config.profile_name)
browser_options.add_argument(f"--profile-directory={self.browser_config.profile_name}")
browser_options.add_argument("--disable-crash-reporter")
browser_options.add_argument("--no-first-run")
browser_options.add_argument("--no-service-autorun")
for chrome_option in self.browser_config.arguments:
LOG.info(" -> Custom chrome argument: %s", chrome_option)
browser_options.add_argument(chrome_option)
LOG.debug("Effective browser arguments: %s", browser_options.arguments)
for crx_extension in self.browser_config.extensions:
ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
browser_options.add_extension(crx_extension)
LOG.debug("Effective browser extensions: %s", browser_options.extensions)
browser_options.add_experimental_option("excludeSwitches", ["enable-automation"])
browser_options.add_experimental_option("useAutomationExtension", False)
browser_options.add_experimental_option("prefs", {
"credentials_enable_service": False,
"profile.password_manager_enabled": False,
"profile.default_content_setting_values.notifications": 2, # 1 = allow, 2 = block browser notifications
"devtools.preferences.currentDockState": "\"bottom\""
})
if not LOG.isEnabledFor(logging.DEBUG):
browser_options.add_argument("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3
LOG.debug("Effective experimental options: %s", browser_options.experimental_options)
if self.browser_config.binary_location:
browser_options.binary_location = self.browser_config.binary_location
LOG.info(" -> Chrome binary location: %s", self.browser_config.binary_location)
return browser_options
def create_webdriver_session(self) -> None:
LOG.info("Creating WebDriver session...")
def init_browser_options(browser_options:ChromiumOptions):
if self.browser_config.use_private_window:
if isinstance(browser_options, webdriver.EdgeOptions):
browser_options.add_argument("-inprivate")
else:
browser_options.add_argument("--incognito")
if self.browser_config.user_data_dir:
LOG.info(" -> Browser User Data Dir: %s", self.browser_config.user_data_dir)
browser_options.add_argument(f"--user-data-dir={self.browser_config.user_data_dir}")
if self.browser_config.profile_name:
LOG.info(" -> Browser Profile Name: %s", self.browser_config.profile_name)
browser_options.add_argument(f"--profile-directory={self.browser_config.profile_name}")
browser_options.add_argument("--disable-crash-reporter")
browser_options.add_argument("--no-first-run")
browser_options.add_argument("--no-service-autorun")
for chrome_option in self.browser_config.arguments:
LOG.info(" -> Custom chrome argument: %s", chrome_option)
browser_options.add_argument(chrome_option)
LOG.debug("Effective browser arguments: %s", browser_options.arguments)
for crx_extension in self.browser_config.extensions:
ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
browser_options.add_extension(crx_extension)
LOG.debug("Effective browser extensions: %s", browser_options.extensions)
browser_options.add_experimental_option("excludeSwitches", ["enable-automation"])
browser_options.add_experimental_option("useAutomationExtension", False)
browser_options.add_experimental_option("prefs", {
"credentials_enable_service": False,
"profile.password_manager_enabled": False,
"profile.default_content_setting_values.notifications": 2, # 1 = allow, 2 = block browser notifications
"devtools.preferences.currentDockState": "\"bottom\""
})
if not LOG.isEnabledFor(logging.DEBUG):
browser_options.add_argument("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3
LOG.debug("Effective experimental options: %s", browser_options.experimental_options)
if self.browser_config.binary_location:
browser_options.binary_location = self.browser_config.binary_location
LOG.info(" -> Chrome binary location: %s", self.browser_config.binary_location)
return browser_options
if not LOG.isEnabledFor(logging.DEBUG):
os.environ['WDM_LOG_LEVEL'] = '0' # silence the web driver manager
# check if a chrome driver is present already
if shutil.which(DEFAULT_CHROMEDRIVER_PATH):
self.webdriver = webdriver.Chrome(options = init_browser_options(webdriver.ChromeOptions()))
self.webdriver = webdriver.Chrome(options = self._init_browser_options(webdriver.ChromeOptions()))
elif shutil.which(DEFAULT_EDGEDRIVER_PATH):
self.webdriver = webdriver.ChromiumEdge(options = init_browser_options(webdriver.EdgeOptions()))
self.webdriver = webdriver.ChromiumEdge(options = self._init_browser_options(webdriver.EdgeOptions()))
else:
# determine browser major version
if self.browser_config.binary_location:
chrome_type, chrome_version = self.get_browser_version(self.browser_config.binary_location)
else:
chrome_type, chrome_version = self.get_browser_version_from_os()
browser_info = self.get_browser_version_from_os()
if browser_info is None:
raise AssertionError("No supported browser found!")
chrome_type, chrome_version = browser_info
chrome_major_version = chrome_version.split(".", 1)[0]
# download and install matching chrome driver
@@ -120,13 +123,13 @@ class SeleniumMixin:
env["MSEDGEDRIVER_TELEMETRY_OPTOUT"] = "1" # https://docs.microsoft.com/en-us/microsoft-edge/privacy-whitepaper/#microsoft-edge-driver
self.webdriver = webdriver.ChromiumEdge(
service = EdgeService(webdriver_path, env = env),
options = init_browser_options(webdriver.EdgeOptions())
options = self._init_browser_options(webdriver.EdgeOptions())
)
else:
webdriver_mgr = ChromeDriverManager(chrome_type = chrome_type, cache_valid_range = 14)
webdriver_mgr.driver.browser_version = chrome_major_version
webdriver_path = webdriver_mgr.install()
self.webdriver = webdriver.Chrome(service = ChromeService(webdriver_path), options = init_browser_options(webdriver.ChromeOptions()))
self.webdriver = webdriver.Chrome(service = ChromeService(webdriver_path), options = self._init_browser_options(webdriver.ChromeOptions()))
# workaround to support Edge, see https://github.com/diprajpatra/selenium-stealth/pull/25
selenium_stealth.Driver = ChromiumDriver
@@ -168,7 +171,7 @@ class SeleniumMixin:
return (ChromeType.MSEDGE, version)
return (ChromeType.GOOGLE, version)
def get_browser_version_from_os(self) -> tuple[ChromeType, str]:
def get_browser_version_from_os(self) -> tuple[ChromeType, str] | None:
version = ChromeDriverManagerUtils.get_browser_version_from_os(ChromeType.CHROMIUM)
if version != "UNKNOWN":
return (ChromeType.CHROMIUM, version)
@@ -184,9 +187,9 @@ class SeleniumMixin:
return (ChromeType.MSEDGE, version)
LOG.debug("Microsoft Edge not found")
return (None, None)
return None
def web_await(self, condition: Callable[[WebDriver], T], timeout:float = 5, exception_on_timeout: Callable[[], Exception] = None) -> T:
def web_await(self, condition: Callable[[WebDriver], T], timeout:float = 5, exception_on_timeout: Callable[[], Exception] | None = None) -> T:
"""
Blocks/waits until the given condition is met.
@@ -194,7 +197,7 @@ class SeleniumMixin:
:raises TimeoutException: if element could not be found within time
"""
try:
return WebDriverWait(self.webdriver, timeout).until(condition)
return WebDriverWait(self.webdriver, timeout).until(condition) # type: ignore[no-any-return]
except TimeoutException as ex:
if exception_on_timeout:
raise exception_on_timeout() from ex
@@ -247,7 +250,7 @@ class SeleniumMixin:
input_field.send_keys(text)
pause()
def web_open(self, url:str, timeout:float = 15, reload_if_already_open = False) -> None:
def web_open(self, url:str, timeout:float = 15, reload_if_already_open:bool = False) -> None:
"""
:param url: url to open in browser
:param timeout: timespan in seconds within the page needs to be loaded
@@ -262,10 +265,10 @@ class SeleniumMixin:
WebDriverWait(self.webdriver, timeout).until(lambda _: self.web_execute("return document.readyState") == "complete")
# pylint: disable=dangerous-default-value
def web_request(self, url:str, method:str = "GET", valid_response_codes:Iterable[int] = [200], headers:dict[str, str] = None) -> dict[str, Any]:
def web_request(self, url:str, method:str = "GET", valid_response_codes:Iterable[int] = [200], headers:dict[str, str] | None = None) -> dict[str, Any]:
method = method.upper()
LOG.debug(" -> HTTP %s [%s]...", method, url)
response = self.webdriver.execute_async_script(f"""
response:dict[str, Any] = self.webdriver.execute_async_script(f"""
var callback = arguments[arguments.length - 1];
fetch("{url}", {{
method: "{method}",

View File

@@ -4,8 +4,8 @@ SPDX-License-Identifier: AGPL-3.0-or-later
"""
import copy, decimal, json, logging, os, re, secrets, sys, traceback, time
from importlib.resources import read_text as get_resource_as_string
from collections.abc import Callable, Iterable
from types import ModuleType
from collections.abc import Callable, Sized
from types import FrameType, ModuleType, TracebackType
from typing import Any, Final, TypeVar
import coloredlogs, inflect
@@ -14,10 +14,11 @@ from ruamel.yaml import YAML
LOG_ROOT:Final[logging.Logger] = logging.getLogger()
LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.utils")
T:Final[TypeVar] = TypeVar('T')
# https://mypy.readthedocs.io/en/stable/generics.html#generic-functions
T = TypeVar('T')
def abspath(relative_path:str, relative_to:str = None):
def abspath(relative_path:str, relative_to:str | None = None) -> str:
"""
Makes a given relative path absolute based on another file/folder
"""
@@ -33,13 +34,13 @@ def abspath(relative_path:str, relative_to:str = None):
return os.path.normpath(os.path.join(relative_to, relative_path))
def ensure(condition:bool | Callable[[], bool], error_message:str, timeout:float = 5, poll_requency:float = 0.5) -> None:
def ensure(condition:Any | bool | Callable[[], bool], error_message:str, timeout:float = 5, poll_requency:float = 0.5) -> None:
"""
:param timeout: timespan in seconds until when the condition must become `True`, default is 5 seconds
:param poll_requency: sleep interval between calls in seconds, default is 0.5 seconds
:raises AssertionError: if condition did not come `True` within given timespan
"""
if not isinstance(condition, Callable):
if not isinstance(condition, Callable): # type: ignore[arg-type] # https://github.com/python/mypy/issues/6864
if condition:
return
raise AssertionError(error_message)
@@ -50,7 +51,7 @@ def ensure(condition:bool | Callable[[], bool], error_message:str, timeout:float
raise AssertionError("[poll_requency] must be >= 0")
start_at = time.time()
while not condition():
while not condition(): # type: ignore[operator]
elapsed = time.time() - start_at
if elapsed >= timeout:
raise AssertionError(error_message)
@@ -65,7 +66,12 @@ def is_frozen() -> bool:
return getattr(sys, "frozen", False)
def apply_defaults(target:dict[Any, Any], defaults:dict[Any, Any], ignore = lambda _k, _v: False, override = lambda _k, _v: False) -> dict[Any, Any]:
def apply_defaults(
target:dict[Any, Any],
defaults:dict[Any, Any],
ignore:Callable[[Any, Any], bool] = lambda _k, _v: False,
override:Callable[[Any, Any], bool] = lambda _k, _v: False
) -> dict[Any, Any]:
"""
>>> apply_defaults({}, {"foo": "bar"})
{'foo': 'bar'}
@@ -122,7 +128,7 @@ def configure_console_logging() -> None:
LOG_ROOT.addHandler(stderr_log)
def on_exception(ex_type, ex_value, ex_traceback) -> None:
def on_exception(ex_type:type[BaseException], ex_value:Any, ex_traceback:TracebackType | None) -> None:
if issubclass(ex_type, KeyboardInterrupt):
sys.__excepthook__(ex_type, ex_value, ex_traceback)
elif LOG.isEnabledFor(logging.DEBUG) or isinstance(ex_value, (AttributeError, ImportError, NameError, TypeError)):
@@ -138,7 +144,7 @@ def on_exit() -> None:
handler.flush()
def on_sigint(_sig:int, _frame) -> None:
def on_sigint(_sig:int, _frame:FrameType | None) -> None:
LOG.warning("Aborted on user request.")
sys.exit(0)
@@ -152,7 +158,7 @@ def pause(min_ms:int = 200, max_ms:int = 2000) -> None:
time.sleep(duration / 1000)
def pluralize(word:str, count:int | Iterable, prefix = True):
def pluralize(word:str, count:int | Sized, prefix:bool = True) -> str:
"""
>>> pluralize("field", 1)
'1 field'
@@ -163,15 +169,25 @@ def pluralize(word:str, count:int | Iterable, prefix = True):
"""
if not hasattr(pluralize, "inflect"):
pluralize.inflect = inflect.engine()
if isinstance(count, Iterable):
if isinstance(count, Sized):
count = len(count)
plural = pluralize.inflect.plural_noun(word, count)
plural:str = pluralize.inflect.plural_noun(word, count)
if prefix:
return f"{count} {plural}"
return plural
def load_dict(filepath:str, content_label:str = "", must_exist = True) -> dict[str, Any] | None:
def load_dict(filepath:str, content_label:str = "") -> dict[str, Any]:
"""
:raises FileNotFoundError
"""
data = load_dict_if_exists(filepath, content_label)
if data is None:
raise FileNotFoundError(filepath)
return data
def load_dict_if_exists(filepath:str, content_label:str = "") -> dict[str, Any] | None:
filepath = os.path.abspath(filepath)
LOG.info("Loading %s[%s]...", content_label and content_label + " from " or "", filepath)
@@ -180,28 +196,23 @@ def load_dict(filepath:str, content_label:str = "", must_exist = True) -> dict[s
raise ValueError(f'Unsupported file type. The file name "{filepath}" must end with *.json, *.yaml, or *.yml')
if not os.path.exists(filepath):
if must_exist:
raise FileNotFoundError(filepath)
return None
with open(filepath, encoding = "utf-8") as file:
return json.load(file) if filepath.endswith(".json") else YAML().load(file)
def load_dict_from_module(module:ModuleType, filename:str, content_label:str = "", must_exist = True) -> dict[str, Any] | None:
def load_dict_from_module(module:ModuleType, filename:str, content_label:str = "") -> dict[str, Any]:
"""
:raises FileNotFoundError
"""
LOG.debug("Loading %s[%s.%s]...", content_label and content_label + " from " or "", module.__name__, filename)
_, file_ext = os.path.splitext(filename)
if file_ext not in [".json", ".yaml", ".yml"]:
if file_ext not in (".json", ".yaml", ".yml"):
raise ValueError(f'Unsupported file type. The file name "{filename}" must end with *.json, *.yaml, or *.yml')
try:
content = get_resource_as_string(module, filename)
except FileNotFoundError as ex:
if must_exist:
raise ex
return None
content = get_resource_as_string(module, filename)
return json.loads(content) if filename.endswith(".json") else YAML().load(content)

View File

@@ -103,6 +103,20 @@ aggressive = 3
[tool.bandit]
#####################
# mypy
# https://github.com/python/mypy
#####################
[tool.mypy]
python_version = "3.10"
strict = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
ignore_missing_imports = true
show_error_codes = true
warn_unused_ignores = true
#####################
# pylint
# https://pypi.org/project/pylint/
@@ -128,7 +142,7 @@ load-plugins = [
]
[tool.pylint.basic]
good-names = ["i", "j", "k", "v", "by", "ex", "fd", "_"]
good-names = ["i", "j", "k", "v", "by", "ex", "fd", "_", "T"]
[tool.pylint.format]
# https://pylint.pycqa.org/en/latest/technical_reference/features.html#format-checker