feat: add grouped selector timeout fallback for login detection (#843)

This commit is contained in:
Jens
2026-02-27 19:11:49 +01:00
committed by GitHub
parent fc456f4abd
commit 38e0f97578
5 changed files with 335 additions and 52 deletions

View File

@@ -1246,23 +1246,27 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
effective_timeout, effective_timeout,
) )
# Try to find the standard element first login_selectors = [
try: (By.CLASS_NAME, "mr-medium"),
user_info = await self.web_text(By.CLASS_NAME, "mr-medium", timeout = login_check_timeout) (By.ID, "user-email"),
if username in user_info.lower(): ]
LOG.debug("Login detected via .mr-medium element") primary_selector_index = 0
return True
except TimeoutError:
LOG.debug("Timeout waiting for .mr-medium element after %.1fs", effective_timeout)
# If standard element not found or didn't contain username, try the alternative
try: try:
user_info = await self.web_text(By.ID, "user-email", timeout = login_check_timeout) user_info, matched_selector = await self.web_text_first_available(
login_selectors,
timeout = login_check_timeout,
key = "login_detection",
description = "login_detection(selector_group)",
)
if username in user_info.lower(): if username in user_info.lower():
if matched_selector == primary_selector_index:
LOG.debug("Login detected via .mr-medium element")
else:
LOG.debug("Login detected via #user-email element") LOG.debug("Login detected via #user-email element")
return True return True
except TimeoutError: except TimeoutError:
LOG.debug("Timeout waiting for #user-email element after %.1fs", effective_timeout) LOG.debug("Timeout waiting for login detection selector group after %.1fs", effective_timeout)
if not include_probe: if not include_probe:
LOG.debug("No login detected - neither .mr-medium nor #user-email found with username") LOG.debug("No login detected - neither .mr-medium nor #user-email found with username")

View File

@@ -98,9 +98,8 @@ kleinanzeigen_bot/__init__.py:
is_logged_in: is_logged_in:
"Starting login detection (timeout: %.1fs base, %.1fs effective with multiplier/backoff)": "Starte Login-Erkennung (Timeout: %.1fs Basis, %.1fs effektiv mit Multiplikator/Backoff)" "Starting login detection (timeout: %.1fs base, %.1fs effective with multiplier/backoff)": "Starte Login-Erkennung (Timeout: %.1fs Basis, %.1fs effektiv mit Multiplikator/Backoff)"
"Login detected via .mr-medium element": "Login erkannt über .mr-medium Element" "Login detected via .mr-medium element": "Login erkannt über .mr-medium Element"
"Timeout waiting for .mr-medium element after %.1fs": "Timeout beim Warten auf .mr-medium Element nach %.1fs"
"Login detected via #user-email element": "Login erkannt über #user-email Element" "Login detected via #user-email element": "Login erkannt über #user-email Element"
"Timeout waiting for #user-email element after %.1fs": "Timeout beim Warten auf #user-email Element nach %.1fs" "Timeout waiting for login detection selector group after %.1fs": "Timeout beim Warten auf die Login-Erkennungs-Selektorgruppe nach %.1fs"
"No login detected - neither .mr-medium nor #user-email found with username": "Kein Login erkannt - weder .mr-medium noch #user-email mit Benutzername gefunden" "No login detected - neither .mr-medium nor #user-email found with username": "Kein Login erkannt - weder .mr-medium noch #user-email mit Benutzername gefunden"
"No login detected - DOM elements not found and server probe returned %s": "Kein Login erkannt - DOM-Elemente nicht gefunden und Server-Probe ergab %s" "No login detected - DOM elements not found and server probe returned %s": "Kein Login erkannt - DOM-Elemente nicht gefunden und Server-Probe ergab %s"
@@ -533,6 +532,17 @@ kleinanzeigen_bot/utils/web_scraping_mixin.py:
_record_timing: _record_timing:
"Timing collector failed for key=%s operation=%s: %s": "Zeitmessung fehlgeschlagen für key=%s operation=%s: %s" "Timing collector failed for key=%s operation=%s: %s": "Zeitmessung fehlgeschlagen für key=%s operation=%s: %s"
_allocate_selector_group_budgets:
"selector_count must be > 0": "selector_count muss > 0 sein"
web_find_first_available:
"selectors must contain at least one selector": "selectors muss mindestens einen Selektor enthalten"
attempt:
"No selector candidates executed.": "Keine Selektor-Kandidaten ausgeführt."
? "No HTML element found using selector group after trying %(count)d alternatives within %(timeout)s seconds. Last error: %(error)s"
: "Kein HTML-Element über Selektorgruppe gefunden, nachdem %(count)d Alternativen innerhalb von %(timeout)s Sekunden versucht wurden. Letzter Fehler: %(error)s"
close_browser_session: close_browser_session:
"Closing Browser session...": "Schließe Browser-Sitzung..." "Closing Browser session...": "Schließe Browser-Sitzung..."

View File

@@ -2,7 +2,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ # SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import asyncio, enum, inspect, json, os, platform, secrets, shutil, subprocess, urllib.request # isort: skip # noqa: S404 import asyncio, enum, inspect, json, os, platform, secrets, shutil, subprocess, urllib.request # isort: skip # noqa: S404
from collections.abc import Awaitable, Callable, Coroutine, Iterable from collections.abc import Awaitable, Callable, Coroutine, Iterable, Sequence
from gettext import gettext as _ from gettext import gettext as _
from pathlib import Path, PureWindowsPath from pathlib import Path, PureWindowsPath
from typing import Any, Final, Optional, cast from typing import Any, Final, Optional, cast
@@ -39,6 +39,9 @@ if TYPE_CHECKING:
# Constants for RemoteObject conversion # Constants for RemoteObject conversion
_KEY_VALUE_PAIR_SIZE = 2 _KEY_VALUE_PAIR_SIZE = 2
_PRIMARY_SELECTOR_BUDGET_RATIO:Final[float] = 0.70
_BACKUP_SELECTOR_BUDGET_CAP_SECONDS:Final[float] = 0.75
_BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS:Final[float] = 0.25
def _resolve_user_data_dir_paths(arg_value:str, config_value:str) -> tuple[Any, Any]: def _resolve_user_data_dir_paths(arg_value:str, config_value:str) -> tuple[Any, Any]:
@@ -254,6 +257,153 @@ class WebScrapingMixin:
raise TimeoutError(f"{description} failed without executing operation") raise TimeoutError(f"{description} failed without executing operation")
@staticmethod
def _allocate_selector_group_budgets(total_timeout:float, selector_count:int) -> list[float]:
"""Allocate a shared timeout budget across selector alternatives.
Strategy:
- Give the first selector a preferred share via `_PRIMARY_SELECTOR_BUDGET_RATIO`.
- Keep a minimum floor `_BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS` per selector.
- Cap backup slices with `_BACKUP_SELECTOR_BUDGET_CAP_SECONDS`.
- Reassign final-backup surplus to the primary slot to preserve total timeout.
"""
if selector_count <= 0:
raise ValueError(_("selector_count must be > 0"))
if selector_count == 1:
return [max(total_timeout, 0.0)]
if total_timeout <= 0:
return [0.0 for _ in range(selector_count)]
# If total_timeout cannot satisfy per-slot floor, split equally to preserve total budget.
floor_total = _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS * selector_count
if total_timeout < floor_total:
equal_share = total_timeout / selector_count
return [equal_share for _ in range(selector_count)]
# Reserve minimum floor for backups before sizing the primary slice.
reserve_for_backups = _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS * (selector_count - 1)
# Primary gets preferred ratio, but never steals the reserved backup floors.
primary = min(total_timeout * _PRIMARY_SELECTOR_BUDGET_RATIO, total_timeout - reserve_for_backups)
primary = max(primary, _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS)
budgets = [primary]
remaining = total_timeout - primary
for index in range(selector_count - 1):
is_last_backup = index == selector_count - 2
if is_last_backup:
# Last backup is capped; any surplus is folded back into primary to keep sum == total_timeout.
alloc = min(remaining, _BACKUP_SELECTOR_BUDGET_CAP_SECONDS)
budgets.append(alloc)
surplus = remaining - alloc
if surplus > 0:
budgets[0] += surplus
continue
remaining_slots_after_this = selector_count - len(budgets) - 1
# Keep floor reserve for remaining backups, then clamp this slice to floor/cap bounds.
min_reserve = _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS * remaining_slots_after_this
alloc = remaining - min_reserve
alloc = max(_BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS, alloc)
alloc = min(_BACKUP_SELECTOR_BUDGET_CAP_SECONDS, alloc)
budgets.append(alloc)
remaining -= alloc
return budgets
async def web_find_first_available(
self,
selectors:Sequence[tuple[By, str]],
*,
parent:Element | None = None,
timeout:int | float | None = None,
key:str = "default",
description:str | None = None,
) -> tuple[Element, int]:
"""
Find the first matching selector from an ordered group using a shared timeout budget.
"""
if not selectors:
raise ValueError(_("selectors must contain at least one selector"))
async def attempt(effective_timeout:float) -> tuple[Element, int]:
budgets = self._allocate_selector_group_budgets(effective_timeout, len(selectors))
failures:list[str] = []
for index, ((selector_type, selector_value), candidate_timeout) in enumerate(zip(selectors, budgets, strict = True)):
try:
element = await self._web_find_once(selector_type, selector_value, candidate_timeout, parent = parent)
LOG.debug(
"Selector group matched candidate %d/%d (%s=%s) within %.2fs (group budget %.2fs)",
index + 1,
len(selectors),
selector_type.name,
selector_value,
candidate_timeout,
effective_timeout,
)
return element, index
except TimeoutError as exc:
failures.append(str(exc))
LOG.debug(
"Selector group candidate %d/%d timed out (%s=%s) after %.2fs (group budget %.2fs)",
index + 1,
len(selectors),
selector_type.name,
selector_value,
candidate_timeout,
effective_timeout,
)
failure_summary = failures[-1] if failures else _("No selector candidates executed.")
raise TimeoutError(
_(
"No HTML element found using selector group after trying %(count)d alternatives within %(timeout)s seconds."
" Last error: %(error)s"
)
% {"count": len(selectors), "timeout": effective_timeout, "error": failure_summary}
)
attempt_description = description or f"web_find_first_available({len(selectors)} selectors)"
return await self._run_with_timeout_retries(attempt, description = attempt_description, key = key, override = timeout)
async def web_text_first_available(
self,
selectors:Sequence[tuple[By, str]],
*,
parent:Element | None = None,
timeout:int | float | None = None,
key:str = "default",
description:str | None = None,
) -> tuple[str, int]:
"""
Return visible text from the first selector that resolves from a selector group.
"""
element, matched_index = await self.web_find_first_available(
selectors,
parent = parent,
timeout = timeout,
key = key,
description = description,
)
text = await self._extract_visible_text(element)
return text, matched_index
async def _extract_visible_text(self, element:Element) -> str:
"""Return visible text for a DOM element using user-selection extraction."""
return str(
await element.apply("""
function (elem) {
let sel = window.getSelection()
sel.removeAllRanges()
let range = document.createRange()
range.selectNode(elem)
sel.addRange(range)
let visibleText = sel.toString().trim()
sel.removeAllRanges()
return visibleText
}
""")
)
async def create_browser_session(self) -> None: async def create_browser_session(self) -> None:
LOG.info("Creating Browser session...") LOG.info("Creating Browser session...")
@@ -699,11 +849,13 @@ class WebScrapingMixin:
return result return result
except Exception as ex1: except Exception as ex1:
ex = ex1 ex = ex1
if loop.time() - start_at > effective_timeout: elapsed = loop.time() - start_at
if elapsed >= effective_timeout:
if ex: if ex:
raise ex raise ex
raise TimeoutError(timeout_error_message or f"Condition not met within {effective_timeout} seconds") raise TimeoutError(timeout_error_message or f"Condition not met within {effective_timeout} seconds")
await self.page.sleep(0.5) remaining_timeout = max(effective_timeout - elapsed, 0.0)
await self.page.sleep(min(0.5, remaining_timeout))
async def web_check(self, selector_type:By, selector_value:str, attr:Is, *, timeout:int | float | None = None) -> bool: async def web_check(self, selector_type:By, selector_value:str, attr:Is, *, timeout:int | float | None = None) -> bool:
""" """
@@ -1013,20 +1165,8 @@ class WebScrapingMixin:
) )
async def web_text(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> str: async def web_text(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> str:
return str( element = await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)
await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply(""" return await self._extract_visible_text(element)
function (elem) {
let sel = window.getSelection()
sel.removeAllRanges()
let range = document.createRange()
range.selectNode(elem)
sel.addRange(range)
let visibleText = sel.toString().trim()
sel.removeAllRanges()
return visibleText
}
""")
)
async def web_sleep(self, min_ms:int = 1_000, max_ms:int = 2_500) -> None: async def web_sleep(self, min_ms:int = 1_000, max_ms:int = 2_500) -> None:
duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms

View File

@@ -442,7 +442,7 @@ class TestKleinanzeigenBotAuthentication:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_is_logged_in_returns_true_when_logged_in(self, test_bot:KleinanzeigenBot) -> None: async def test_is_logged_in_returns_true_when_logged_in(self, test_bot:KleinanzeigenBot) -> None:
"""Verify that login check returns true when logged in.""" """Verify that login check returns true when logged in."""
with patch.object(test_bot, "web_text", return_value = "Welcome dummy_user"): with patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)):
assert await test_bot.is_logged_in() is True assert await test_bot.is_logged_in() is True
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -450,11 +450,9 @@ class TestKleinanzeigenBotAuthentication:
"""Verify that login check returns true when logged in with alternative element.""" """Verify that login check returns true when logged in with alternative element."""
with patch.object( with patch.object(
test_bot, test_bot,
"web_text", "web_text_first_available",
side_effect = [ new_callable = AsyncMock,
TimeoutError(), # First try with mr-medium fails return_value = ("angemeldet als: dummy_user", 1),
"angemeldet als: dummy_user", # Second try with user-email succeeds
],
): ):
assert await test_bot.is_logged_in() is True assert await test_bot.is_logged_in() is True
@@ -462,7 +460,7 @@ class TestKleinanzeigenBotAuthentication:
async def test_is_logged_in_returns_false_when_not_logged_in(self, test_bot:KleinanzeigenBot) -> None: async def test_is_logged_in_returns_false_when_not_logged_in(self, test_bot:KleinanzeigenBot) -> None:
"""Verify that login check returns false when not logged in.""" """Verify that login check returns false when not logged in."""
with ( with (
patch.object(test_bot, "web_text", side_effect = TimeoutError), patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
patch.object( patch.object(
test_bot, test_bot,
"web_request", "web_request",
@@ -472,10 +470,23 @@ class TestKleinanzeigenBotAuthentication:
): ):
assert await test_bot.is_logged_in() is False assert await test_bot.is_logged_in() is False
@pytest.mark.asyncio
async def test_is_logged_in_uses_selector_group_timeout_key(self, test_bot:KleinanzeigenBot) -> None:
"""Verify login detection uses selector-group lookup with login_detection timeout key."""
with patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)) as group_text:
assert await test_bot.is_logged_in(include_probe = False) is True
group_text.assert_awaited_once()
call_args = group_text.await_args
assert call_args is not None
assert call_args.args[0] == [(By.CLASS_NAME, "mr-medium"), (By.ID, "user-email")]
assert call_args.kwargs["key"] == "login_detection"
assert call_args.kwargs["timeout"] == test_bot._timeout("login_detection")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_login_state_prefers_dom_over_auth_probe(self, test_bot:KleinanzeigenBot) -> None: async def test_get_login_state_prefers_dom_over_auth_probe(self, test_bot:KleinanzeigenBot) -> None:
with ( with (
patch.object(test_bot, "web_text", new_callable = AsyncMock, return_value = "Welcome dummy_user") as web_text, patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)) as web_text,
patch.object( patch.object(
test_bot, "_auth_probe_login_state", new_callable = AsyncMock, side_effect = AssertionError("Probe must not run when DOM is deterministic") test_bot, "_auth_probe_login_state", new_callable = AsyncMock, side_effect = AssertionError("Probe must not run when DOM is deterministic")
) as probe, ) as probe,
@@ -487,32 +498,32 @@ class TestKleinanzeigenBotAuthentication:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_login_state_falls_back_to_auth_probe_when_dom_inconclusive(self, test_bot:KleinanzeigenBot) -> None: async def test_get_login_state_falls_back_to_auth_probe_when_dom_inconclusive(self, test_bot:KleinanzeigenBot) -> None:
with ( with (
patch.object(test_bot, "web_text", side_effect = TimeoutError) as web_text, patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text,
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_IN) as probe, patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_IN) as probe,
): ):
assert await test_bot.get_login_state() == LoginState.LOGGED_IN assert await test_bot.get_login_state() == LoginState.LOGGED_IN
assert web_text.call_count == 2 web_text.assert_awaited_once()
probe.assert_awaited_once() probe.assert_awaited_once()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_login_state_falls_back_to_auth_probe_when_dom_logged_out(self, test_bot:KleinanzeigenBot) -> None: async def test_get_login_state_falls_back_to_auth_probe_when_dom_logged_out(self, test_bot:KleinanzeigenBot) -> None:
with ( with (
patch.object(test_bot, "web_text", side_effect = TimeoutError) as web_text, patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text,
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_OUT) as probe, patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_OUT) as probe,
): ):
assert await test_bot.get_login_state() == LoginState.LOGGED_OUT assert await test_bot.get_login_state() == LoginState.LOGGED_OUT
assert web_text.call_count == 2 web_text.assert_awaited_once()
probe.assert_awaited_once() probe.assert_awaited_once()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_login_state_returns_unknown_when_probe_unknown_and_dom_inconclusive(self, test_bot:KleinanzeigenBot) -> None: async def test_get_login_state_returns_unknown_when_probe_unknown_and_dom_inconclusive(self, test_bot:KleinanzeigenBot) -> None:
with ( with (
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN) as probe, patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN) as probe,
patch.object(test_bot, "web_text", side_effect = TimeoutError) as web_text, patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text,
): ):
assert await test_bot.get_login_state() == LoginState.UNKNOWN assert await test_bot.get_login_state() == LoginState.UNKNOWN
probe.assert_awaited_once() probe.assert_awaited_once()
assert web_text.call_count == 2 web_text.assert_awaited_once()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_login_state_unknown_captures_diagnostics_when_enabled(self, test_bot:KleinanzeigenBot, tmp_path:Path) -> None: async def test_get_login_state_unknown_captures_diagnostics_when_enabled(self, test_bot:KleinanzeigenBot, tmp_path:Path) -> None:
@@ -525,7 +536,7 @@ class TestKleinanzeigenBotAuthentication:
with ( with (
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN),
patch.object(test_bot, "web_text", side_effect = TimeoutError), patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
): ):
assert await test_bot.get_login_state() == LoginState.UNKNOWN assert await test_bot.get_login_state() == LoginState.UNKNOWN
@@ -543,7 +554,7 @@ class TestKleinanzeigenBotAuthentication:
with ( with (
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN),
patch.object(test_bot, "web_text", side_effect = TimeoutError), patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
): ):
assert await test_bot.get_login_state() == LoginState.UNKNOWN assert await test_bot.get_login_state() == LoginState.UNKNOWN
@@ -566,7 +577,7 @@ class TestKleinanzeigenBotAuthentication:
with ( with (
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN),
patch.object(test_bot, "web_text", side_effect = TimeoutError), patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
patch("kleinanzeigen_bot.sys.stdin", stdin_mock), patch("kleinanzeigen_bot.sys.stdin", stdin_mock),
patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput, patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput,
): ):
@@ -594,7 +605,7 @@ class TestKleinanzeigenBotAuthentication:
with ( with (
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN),
patch.object(test_bot, "web_text", side_effect = TimeoutError), patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
patch("kleinanzeigen_bot.sys.stdin", stdin_mock), patch("kleinanzeigen_bot.sys.stdin", stdin_mock),
patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput, patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput,
): ):

View File

@@ -2,9 +2,6 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ # SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""Unit tests for web_scraping_mixin.py focusing on error handling scenarios. """Unit tests for web_scraping_mixin.py focusing on error handling scenarios.
Copyright (c) 2024, kleinanzeigen-bot contributors.
All rights reserved.
""" """
import json import json
@@ -536,6 +533,112 @@ class TestTimeoutAndRetryHelpers:
): ):
await web_scraper._run_with_timeout_retries(never_called, description = "guarded-op") await web_scraper._run_with_timeout_retries(never_called, description = "guarded-op")
def test_allocate_selector_group_budgets_distributes_total(self, web_scraper:WebScrapingMixin) -> None:
"""Selector group budgets should consume the full timeout budget."""
budgets = web_scraper._allocate_selector_group_budgets(2.0, 2)
assert len(budgets) == 2
assert budgets[0] + budgets[1] == pytest.approx(2.0)
def test_allocate_selector_group_budgets_rejects_zero_selector_count(self, web_scraper:WebScrapingMixin) -> None:
"""Selector budget helper should reject empty selector groups."""
with pytest.raises(ValueError, match = "selector_count must be > 0"):
web_scraper._allocate_selector_group_budgets(1.0, 0)
def test_allocate_selector_group_budgets_single_selector_clamps_negative_timeout(self, web_scraper:WebScrapingMixin) -> None:
"""Single-selector budgets should never be negative."""
budgets = web_scraper._allocate_selector_group_budgets(-1.0, 1)
assert budgets == [0.0]
def test_allocate_selector_group_budgets_non_positive_timeout_returns_zeroes(self, web_scraper:WebScrapingMixin) -> None:
"""Multi-selector groups with non-positive timeout should return zero budgets."""
budgets = web_scraper._allocate_selector_group_budgets(0.0, 3)
assert budgets == [0.0, 0.0, 0.0]
def test_allocate_selector_group_budgets_tiny_timeout_splits_equally(self, web_scraper:WebScrapingMixin) -> None:
"""When timeout is too small for floors, budgets should split equally."""
# 0.2s is below floor_total for two selectors (2 * 0.25s), so equal split applies.
budgets = web_scraper._allocate_selector_group_budgets(0.2, 2)
assert budgets == pytest.approx([0.1, 0.1])
def test_allocate_selector_group_budgets_redistributes_surplus_to_primary(self, web_scraper:WebScrapingMixin) -> None:
"""Last-backup cap overflow should be redistributed back to primary budget."""
budgets = web_scraper._allocate_selector_group_budgets(5.0, 2)
# Derivation with current constants:
# primary=min(5.0*0.70, 5.0-0.25)=3.5; last backup cap=0.75; surplus=1.5 -> primary+surplus=5.0-0.75=4.25.
assert budgets == pytest.approx([4.25, 0.75])
def test_allocate_selector_group_budgets_multiple_backups_apply_reserve_logic(self, web_scraper:WebScrapingMixin) -> None:
"""Multi-backup groups should apply reserve/floor logic before final backup cap."""
budgets = web_scraper._allocate_selector_group_budgets(3.0, 4)
# Derivation with current constants:
# reserve_for_backups=0.25*3=0.75; primary=min(3.0*0.70, 2.25)=2.1.
# remaining=0.9 -> backup1=max(0.25, min(0.75, 0.9-0.5))=0.4.
# remaining=0.5 -> backup2=max(0.25, min(0.75, 0.5-0.25))=0.25.
# final backup=min(0.25, 0.75)=0.25.
assert budgets == pytest.approx([2.1, 0.4, 0.25, 0.25])
assert sum(budgets) == pytest.approx(3.0)
@pytest.mark.asyncio
async def test_web_find_first_available_uses_shared_budget(self, web_scraper:WebScrapingMixin) -> None:
"""web_find_first_available should try alternatives in order with shared budget slices."""
first_timeout:float | None = None
second_timeout:float | None = None
found = AsyncMock(spec = Element)
async def fake_find_once(
selector_type:By, selector_value:str, timeout:float, *, parent:Element | None = None
) -> Element:
nonlocal first_timeout, second_timeout
if selector_value == "first":
first_timeout = timeout
raise TimeoutError("first timeout")
second_timeout = timeout
return found
with patch.object(web_scraper, "_web_find_once", side_effect = fake_find_once):
result, index = await web_scraper.web_find_first_available(
[(By.ID, "first"), (By.ID, "second")],
timeout = 2.0,
key = "login_detection",
)
assert result is found
assert index == 1
assert first_timeout is not None
assert second_timeout is not None
assert first_timeout + second_timeout == pytest.approx(2.0)
@pytest.mark.asyncio
async def test_web_find_first_available_exhausts_candidates_once_when_retry_disabled(self, web_scraper:WebScrapingMixin) -> None:
"""Candidate exhaustion should not multiply attempts when retry is disabled."""
web_scraper.config.timeouts.retry_enabled = False
with (
patch.object(web_scraper, "_web_find_once", side_effect = TimeoutError("not found")) as find_once,
pytest.raises(TimeoutError, match = "No HTML element found using selector group"),
):
await web_scraper.web_find_first_available([(By.ID, "first"), (By.ID, "second")], timeout = 1.0)
assert find_once.await_count == 2
@pytest.mark.asyncio
async def test_web_find_first_available_rejects_empty_selectors(self, web_scraper:WebScrapingMixin) -> None:
"""Selector-group lookup should fail fast when no selectors are configured."""
with pytest.raises(ValueError, match = "selectors must contain at least one selector"):
await web_scraper.web_find_first_available([])
@pytest.mark.asyncio
async def test_web_text_first_available_returns_text_and_index(self, web_scraper:WebScrapingMixin) -> None:
"""Text-group helper should return extracted text and the matched selector index."""
mock_element = AsyncMock(spec = Element)
mock_element.apply = AsyncMock(return_value = "dummy-user")
with patch.object(web_scraper, "web_find_first_available", new_callable = AsyncMock, return_value = (mock_element, 1)):
text, index = await web_scraper.web_text_first_available([(By.ID, "a"), (By.ID, "b")], key = "login_detection")
assert text == "dummy-user"
assert index == 1
class TestSelectorTimeoutMessages: class TestSelectorTimeoutMessages:
"""Ensure selector helpers provide informative timeout messages.""" """Ensure selector helpers provide informative timeout messages."""
@@ -815,6 +918,21 @@ class TestWebScrolling:
with pytest.raises(TimeoutError): with pytest.raises(TimeoutError):
await web_scraper.web_await(condition, timeout = 0.05) await web_scraper.web_await(condition, timeout = 0.05)
@pytest.mark.asyncio
async def test_web_await_caps_sleep_to_remaining_timeout(self, web_scraper:WebScrapingMixin, mock_page:TrulyAwaitableMockPage) -> None:
"""web_await should not sleep longer than the remaining timeout budget."""
async def condition() -> bool:
return False
with pytest.raises(TimeoutError):
await web_scraper.web_await(condition, timeout = 0.2, apply_multiplier = False)
sleep_mock = cast(AsyncMock, mock_page.sleep)
sleep_mock.assert_awaited()
slept_seconds = sleep_mock.await_args_list[0].args[0]
assert slept_seconds <= 0.2
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_web_find_retry_mechanism(self, web_scraper:WebScrapingMixin, mock_page:TrulyAwaitableMockPage) -> None: async def test_web_find_retry_mechanism(self, web_scraper:WebScrapingMixin, mock_page:TrulyAwaitableMockPage) -> None:
"""Test web_find retries until element is found within timeout.""" """Test web_find retries until element is found within timeout."""