diff --git a/src/kleinanzeigen_bot/__init__.py b/src/kleinanzeigen_bot/__init__.py index ff20ead..91419ae 100644 --- a/src/kleinanzeigen_bot/__init__.py +++ b/src/kleinanzeigen_bot/__init__.py @@ -1246,23 +1246,27 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 effective_timeout, ) - # Try to find the standard element first - try: - user_info = await self.web_text(By.CLASS_NAME, "mr-medium", timeout = login_check_timeout) - if username in user_info.lower(): - LOG.debug("Login detected via .mr-medium element") - return True - except TimeoutError: - LOG.debug("Timeout waiting for .mr-medium element after %.1fs", effective_timeout) + login_selectors = [ + (By.CLASS_NAME, "mr-medium"), + (By.ID, "user-email"), + ] + primary_selector_index = 0 - # If standard element not found or didn't contain username, try the alternative try: - user_info = await self.web_text(By.ID, "user-email", timeout = login_check_timeout) + user_info, matched_selector = await self.web_text_first_available( + login_selectors, + timeout = login_check_timeout, + key = "login_detection", + description = "login_detection(selector_group)", + ) if username in user_info.lower(): - LOG.debug("Login detected via #user-email element") + if matched_selector == primary_selector_index: + LOG.debug("Login detected via .mr-medium element") + else: + LOG.debug("Login detected via #user-email element") return True except TimeoutError: - LOG.debug("Timeout waiting for #user-email element after %.1fs", effective_timeout) + LOG.debug("Timeout waiting for login detection selector group after %.1fs", effective_timeout) if not include_probe: LOG.debug("No login detected - neither .mr-medium nor #user-email found with username") diff --git a/src/kleinanzeigen_bot/resources/translations.de.yaml b/src/kleinanzeigen_bot/resources/translations.de.yaml index c93ed75..6c0618c 100644 --- a/src/kleinanzeigen_bot/resources/translations.de.yaml +++ b/src/kleinanzeigen_bot/resources/translations.de.yaml @@ -98,9 +98,8 @@ kleinanzeigen_bot/__init__.py: is_logged_in: "Starting login detection (timeout: %.1fs base, %.1fs effective with multiplier/backoff)": "Starte Login-Erkennung (Timeout: %.1fs Basis, %.1fs effektiv mit Multiplikator/Backoff)" "Login detected via .mr-medium element": "Login erkannt über .mr-medium Element" - "Timeout waiting for .mr-medium element after %.1fs": "Timeout beim Warten auf .mr-medium Element nach %.1fs" "Login detected via #user-email element": "Login erkannt über #user-email Element" - "Timeout waiting for #user-email element after %.1fs": "Timeout beim Warten auf #user-email Element nach %.1fs" + "Timeout waiting for login detection selector group after %.1fs": "Timeout beim Warten auf die Login-Erkennungs-Selektorgruppe nach %.1fs" "No login detected - neither .mr-medium nor #user-email found with username": "Kein Login erkannt - weder .mr-medium noch #user-email mit Benutzername gefunden" "No login detected - DOM elements not found and server probe returned %s": "Kein Login erkannt - DOM-Elemente nicht gefunden und Server-Probe ergab %s" @@ -533,6 +532,17 @@ kleinanzeigen_bot/utils/web_scraping_mixin.py: _record_timing: "Timing collector failed for key=%s operation=%s: %s": "Zeitmessung fehlgeschlagen für key=%s operation=%s: %s" + _allocate_selector_group_budgets: + "selector_count must be > 0": "selector_count muss > 0 sein" + + web_find_first_available: + "selectors must contain at least one selector": "selectors muss mindestens einen Selektor enthalten" + + attempt: + "No selector candidates executed.": "Keine Selektor-Kandidaten ausgeführt." + ? "No HTML element found using selector group after trying %(count)d alternatives within %(timeout)s seconds. Last error: %(error)s" + : "Kein HTML-Element über Selektorgruppe gefunden, nachdem %(count)d Alternativen innerhalb von %(timeout)s Sekunden versucht wurden. Letzter Fehler: %(error)s" + close_browser_session: "Closing Browser session...": "Schließe Browser-Sitzung..." diff --git a/src/kleinanzeigen_bot/utils/web_scraping_mixin.py b/src/kleinanzeigen_bot/utils/web_scraping_mixin.py index d354866..a4d00ad 100644 --- a/src/kleinanzeigen_bot/utils/web_scraping_mixin.py +++ b/src/kleinanzeigen_bot/utils/web_scraping_mixin.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ import asyncio, enum, inspect, json, os, platform, secrets, shutil, subprocess, urllib.request # isort: skip # noqa: S404 -from collections.abc import Awaitable, Callable, Coroutine, Iterable +from collections.abc import Awaitable, Callable, Coroutine, Iterable, Sequence from gettext import gettext as _ from pathlib import Path, PureWindowsPath from typing import Any, Final, Optional, cast @@ -39,6 +39,9 @@ if TYPE_CHECKING: # Constants for RemoteObject conversion _KEY_VALUE_PAIR_SIZE = 2 +_PRIMARY_SELECTOR_BUDGET_RATIO:Final[float] = 0.70 +_BACKUP_SELECTOR_BUDGET_CAP_SECONDS:Final[float] = 0.75 +_BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS:Final[float] = 0.25 def _resolve_user_data_dir_paths(arg_value:str, config_value:str) -> tuple[Any, Any]: @@ -254,6 +257,153 @@ class WebScrapingMixin: raise TimeoutError(f"{description} failed without executing operation") + @staticmethod + def _allocate_selector_group_budgets(total_timeout:float, selector_count:int) -> list[float]: + """Allocate a shared timeout budget across selector alternatives. + + Strategy: + - Give the first selector a preferred share via `_PRIMARY_SELECTOR_BUDGET_RATIO`. + - Keep a minimum floor `_BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS` per selector. + - Cap backup slices with `_BACKUP_SELECTOR_BUDGET_CAP_SECONDS`. + - Reassign final-backup surplus to the primary slot to preserve total timeout. + """ + if selector_count <= 0: + raise ValueError(_("selector_count must be > 0")) + if selector_count == 1: + return [max(total_timeout, 0.0)] + if total_timeout <= 0: + return [0.0 for _ in range(selector_count)] + + # If total_timeout cannot satisfy per-slot floor, split equally to preserve total budget. + floor_total = _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS * selector_count + if total_timeout < floor_total: + equal_share = total_timeout / selector_count + return [equal_share for _ in range(selector_count)] + + # Reserve minimum floor for backups before sizing the primary slice. + reserve_for_backups = _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS * (selector_count - 1) + # Primary gets preferred ratio, but never steals the reserved backup floors. + primary = min(total_timeout * _PRIMARY_SELECTOR_BUDGET_RATIO, total_timeout - reserve_for_backups) + primary = max(primary, _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS) + budgets = [primary] + remaining = total_timeout - primary + + for index in range(selector_count - 1): + is_last_backup = index == selector_count - 2 + if is_last_backup: + # Last backup is capped; any surplus is folded back into primary to keep sum == total_timeout. + alloc = min(remaining, _BACKUP_SELECTOR_BUDGET_CAP_SECONDS) + budgets.append(alloc) + surplus = remaining - alloc + if surplus > 0: + budgets[0] += surplus + continue + + remaining_slots_after_this = selector_count - len(budgets) - 1 + # Keep floor reserve for remaining backups, then clamp this slice to floor/cap bounds. + min_reserve = _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS * remaining_slots_after_this + alloc = remaining - min_reserve + alloc = max(_BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS, alloc) + alloc = min(_BACKUP_SELECTOR_BUDGET_CAP_SECONDS, alloc) + budgets.append(alloc) + remaining -= alloc + + return budgets + + async def web_find_first_available( + self, + selectors:Sequence[tuple[By, str]], + *, + parent:Element | None = None, + timeout:int | float | None = None, + key:str = "default", + description:str | None = None, + ) -> tuple[Element, int]: + """ + Find the first matching selector from an ordered group using a shared timeout budget. + """ + if not selectors: + raise ValueError(_("selectors must contain at least one selector")) + + async def attempt(effective_timeout:float) -> tuple[Element, int]: + budgets = self._allocate_selector_group_budgets(effective_timeout, len(selectors)) + failures:list[str] = [] + for index, ((selector_type, selector_value), candidate_timeout) in enumerate(zip(selectors, budgets, strict = True)): + try: + element = await self._web_find_once(selector_type, selector_value, candidate_timeout, parent = parent) + LOG.debug( + "Selector group matched candidate %d/%d (%s=%s) within %.2fs (group budget %.2fs)", + index + 1, + len(selectors), + selector_type.name, + selector_value, + candidate_timeout, + effective_timeout, + ) + return element, index + except TimeoutError as exc: + failures.append(str(exc)) + LOG.debug( + "Selector group candidate %d/%d timed out (%s=%s) after %.2fs (group budget %.2fs)", + index + 1, + len(selectors), + selector_type.name, + selector_value, + candidate_timeout, + effective_timeout, + ) + + failure_summary = failures[-1] if failures else _("No selector candidates executed.") + raise TimeoutError( + _( + "No HTML element found using selector group after trying %(count)d alternatives within %(timeout)s seconds." + " Last error: %(error)s" + ) + % {"count": len(selectors), "timeout": effective_timeout, "error": failure_summary} + ) + + attempt_description = description or f"web_find_first_available({len(selectors)} selectors)" + return await self._run_with_timeout_retries(attempt, description = attempt_description, key = key, override = timeout) + + async def web_text_first_available( + self, + selectors:Sequence[tuple[By, str]], + *, + parent:Element | None = None, + timeout:int | float | None = None, + key:str = "default", + description:str | None = None, + ) -> tuple[str, int]: + """ + Return visible text from the first selector that resolves from a selector group. + """ + element, matched_index = await self.web_find_first_available( + selectors, + parent = parent, + timeout = timeout, + key = key, + description = description, + ) + text = await self._extract_visible_text(element) + return text, matched_index + + async def _extract_visible_text(self, element:Element) -> str: + """Return visible text for a DOM element using user-selection extraction.""" + return str( + await element.apply(""" + function (elem) { + let sel = window.getSelection() + sel.removeAllRanges() + let range = document.createRange() + range.selectNode(elem) + sel.addRange(range) + let visibleText = sel.toString().trim() + sel.removeAllRanges() + return visibleText + } + """) + ) + async def create_browser_session(self) -> None: LOG.info("Creating Browser session...") @@ -699,11 +849,13 @@ class WebScrapingMixin: return result except Exception as ex1: ex = ex1 - if loop.time() - start_at > effective_timeout: + elapsed = loop.time() - start_at + if elapsed >= effective_timeout: if ex: raise ex raise TimeoutError(timeout_error_message or f"Condition not met within {effective_timeout} seconds") - await self.page.sleep(0.5) + remaining_timeout = max(effective_timeout - elapsed, 0.0) + await self.page.sleep(min(0.5, remaining_timeout)) async def web_check(self, selector_type:By, selector_value:str, attr:Is, *, timeout:int | float | None = None) -> bool: """ @@ -1013,20 +1165,8 @@ class WebScrapingMixin: ) async def web_text(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> str: - return str( - await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply(""" - function (elem) { - let sel = window.getSelection() - sel.removeAllRanges() - let range = document.createRange() - range.selectNode(elem) - sel.addRange(range) - let visibleText = sel.toString().trim() - sel.removeAllRanges() - return visibleText - } - """) - ) + element = await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout) + return await self._extract_visible_text(element) async def web_sleep(self, min_ms:int = 1_000, max_ms:int = 2_500) -> None: duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms diff --git a/tests/unit/test_init.py b/tests/unit/test_init.py index 44afdbb..4367b05 100644 --- a/tests/unit/test_init.py +++ b/tests/unit/test_init.py @@ -442,7 +442,7 @@ class TestKleinanzeigenBotAuthentication: @pytest.mark.asyncio async def test_is_logged_in_returns_true_when_logged_in(self, test_bot:KleinanzeigenBot) -> None: """Verify that login check returns true when logged in.""" - with patch.object(test_bot, "web_text", return_value = "Welcome dummy_user"): + with patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)): assert await test_bot.is_logged_in() is True @pytest.mark.asyncio @@ -450,11 +450,9 @@ class TestKleinanzeigenBotAuthentication: """Verify that login check returns true when logged in with alternative element.""" with patch.object( test_bot, - "web_text", - side_effect = [ - TimeoutError(), # First try with mr-medium fails - "angemeldet als: dummy_user", # Second try with user-email succeeds - ], + "web_text_first_available", + new_callable = AsyncMock, + return_value = ("angemeldet als: dummy_user", 1), ): assert await test_bot.is_logged_in() is True @@ -462,7 +460,7 @@ class TestKleinanzeigenBotAuthentication: async def test_is_logged_in_returns_false_when_not_logged_in(self, test_bot:KleinanzeigenBot) -> None: """Verify that login check returns false when not logged in.""" with ( - patch.object(test_bot, "web_text", side_effect = TimeoutError), + patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), patch.object( test_bot, "web_request", @@ -472,10 +470,23 @@ class TestKleinanzeigenBotAuthentication: ): assert await test_bot.is_logged_in() is False + @pytest.mark.asyncio + async def test_is_logged_in_uses_selector_group_timeout_key(self, test_bot:KleinanzeigenBot) -> None: + """Verify login detection uses selector-group lookup with login_detection timeout key.""" + with patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)) as group_text: + assert await test_bot.is_logged_in(include_probe = False) is True + + group_text.assert_awaited_once() + call_args = group_text.await_args + assert call_args is not None + assert call_args.args[0] == [(By.CLASS_NAME, "mr-medium"), (By.ID, "user-email")] + assert call_args.kwargs["key"] == "login_detection" + assert call_args.kwargs["timeout"] == test_bot._timeout("login_detection") + @pytest.mark.asyncio async def test_get_login_state_prefers_dom_over_auth_probe(self, test_bot:KleinanzeigenBot) -> None: with ( - patch.object(test_bot, "web_text", new_callable = AsyncMock, return_value = "Welcome dummy_user") as web_text, + patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)) as web_text, patch.object( test_bot, "_auth_probe_login_state", new_callable = AsyncMock, side_effect = AssertionError("Probe must not run when DOM is deterministic") ) as probe, @@ -487,32 +498,32 @@ class TestKleinanzeigenBotAuthentication: @pytest.mark.asyncio async def test_get_login_state_falls_back_to_auth_probe_when_dom_inconclusive(self, test_bot:KleinanzeigenBot) -> None: with ( - patch.object(test_bot, "web_text", side_effect = TimeoutError) as web_text, + patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text, patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_IN) as probe, ): assert await test_bot.get_login_state() == LoginState.LOGGED_IN - assert web_text.call_count == 2 + web_text.assert_awaited_once() probe.assert_awaited_once() @pytest.mark.asyncio async def test_get_login_state_falls_back_to_auth_probe_when_dom_logged_out(self, test_bot:KleinanzeigenBot) -> None: with ( - patch.object(test_bot, "web_text", side_effect = TimeoutError) as web_text, + patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text, patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_OUT) as probe, ): assert await test_bot.get_login_state() == LoginState.LOGGED_OUT - assert web_text.call_count == 2 + web_text.assert_awaited_once() probe.assert_awaited_once() @pytest.mark.asyncio async def test_get_login_state_returns_unknown_when_probe_unknown_and_dom_inconclusive(self, test_bot:KleinanzeigenBot) -> None: with ( patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN) as probe, - patch.object(test_bot, "web_text", side_effect = TimeoutError) as web_text, + patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text, ): assert await test_bot.get_login_state() == LoginState.UNKNOWN probe.assert_awaited_once() - assert web_text.call_count == 2 + web_text.assert_awaited_once() @pytest.mark.asyncio async def test_get_login_state_unknown_captures_diagnostics_when_enabled(self, test_bot:KleinanzeigenBot, tmp_path:Path) -> None: @@ -525,7 +536,7 @@ class TestKleinanzeigenBotAuthentication: with ( patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), - patch.object(test_bot, "web_text", side_effect = TimeoutError), + patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), ): assert await test_bot.get_login_state() == LoginState.UNKNOWN @@ -543,7 +554,7 @@ class TestKleinanzeigenBotAuthentication: with ( patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), - patch.object(test_bot, "web_text", side_effect = TimeoutError), + patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), ): assert await test_bot.get_login_state() == LoginState.UNKNOWN @@ -566,7 +577,7 @@ class TestKleinanzeigenBotAuthentication: with ( patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), - patch.object(test_bot, "web_text", side_effect = TimeoutError), + patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), patch("kleinanzeigen_bot.sys.stdin", stdin_mock), patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput, ): @@ -594,7 +605,7 @@ class TestKleinanzeigenBotAuthentication: with ( patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), - patch.object(test_bot, "web_text", side_effect = TimeoutError), + patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), patch("kleinanzeigen_bot.sys.stdin", stdin_mock), patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput, ): diff --git a/tests/unit/test_web_scraping_mixin.py b/tests/unit/test_web_scraping_mixin.py index f3a835f..b6fbf17 100644 --- a/tests/unit/test_web_scraping_mixin.py +++ b/tests/unit/test_web_scraping_mixin.py @@ -2,9 +2,6 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ """Unit tests for web_scraping_mixin.py focusing on error handling scenarios. - -Copyright (c) 2024, kleinanzeigen-bot contributors. -All rights reserved. """ import json @@ -536,6 +533,112 @@ class TestTimeoutAndRetryHelpers: ): await web_scraper._run_with_timeout_retries(never_called, description = "guarded-op") + def test_allocate_selector_group_budgets_distributes_total(self, web_scraper:WebScrapingMixin) -> None: + """Selector group budgets should consume the full timeout budget.""" + budgets = web_scraper._allocate_selector_group_budgets(2.0, 2) + assert len(budgets) == 2 + assert budgets[0] + budgets[1] == pytest.approx(2.0) + + def test_allocate_selector_group_budgets_rejects_zero_selector_count(self, web_scraper:WebScrapingMixin) -> None: + """Selector budget helper should reject empty selector groups.""" + with pytest.raises(ValueError, match = "selector_count must be > 0"): + web_scraper._allocate_selector_group_budgets(1.0, 0) + + def test_allocate_selector_group_budgets_single_selector_clamps_negative_timeout(self, web_scraper:WebScrapingMixin) -> None: + """Single-selector budgets should never be negative.""" + budgets = web_scraper._allocate_selector_group_budgets(-1.0, 1) + assert budgets == [0.0] + + def test_allocate_selector_group_budgets_non_positive_timeout_returns_zeroes(self, web_scraper:WebScrapingMixin) -> None: + """Multi-selector groups with non-positive timeout should return zero budgets.""" + budgets = web_scraper._allocate_selector_group_budgets(0.0, 3) + assert budgets == [0.0, 0.0, 0.0] + + def test_allocate_selector_group_budgets_tiny_timeout_splits_equally(self, web_scraper:WebScrapingMixin) -> None: + """When timeout is too small for floors, budgets should split equally.""" + # 0.2s is below floor_total for two selectors (2 * 0.25s), so equal split applies. + budgets = web_scraper._allocate_selector_group_budgets(0.2, 2) + assert budgets == pytest.approx([0.1, 0.1]) + + def test_allocate_selector_group_budgets_redistributes_surplus_to_primary(self, web_scraper:WebScrapingMixin) -> None: + """Last-backup cap overflow should be redistributed back to primary budget.""" + budgets = web_scraper._allocate_selector_group_budgets(5.0, 2) + # Derivation with current constants: + # primary=min(5.0*0.70, 5.0-0.25)=3.5; last backup cap=0.75; surplus=1.5 -> primary+surplus=5.0-0.75=4.25. + assert budgets == pytest.approx([4.25, 0.75]) + + def test_allocate_selector_group_budgets_multiple_backups_apply_reserve_logic(self, web_scraper:WebScrapingMixin) -> None: + """Multi-backup groups should apply reserve/floor logic before final backup cap.""" + budgets = web_scraper._allocate_selector_group_budgets(3.0, 4) + # Derivation with current constants: + # reserve_for_backups=0.25*3=0.75; primary=min(3.0*0.70, 2.25)=2.1. + # remaining=0.9 -> backup1=max(0.25, min(0.75, 0.9-0.5))=0.4. + # remaining=0.5 -> backup2=max(0.25, min(0.75, 0.5-0.25))=0.25. + # final backup=min(0.25, 0.75)=0.25. + assert budgets == pytest.approx([2.1, 0.4, 0.25, 0.25]) + assert sum(budgets) == pytest.approx(3.0) + + @pytest.mark.asyncio + async def test_web_find_first_available_uses_shared_budget(self, web_scraper:WebScrapingMixin) -> None: + """web_find_first_available should try alternatives in order with shared budget slices.""" + first_timeout:float | None = None + second_timeout:float | None = None + found = AsyncMock(spec = Element) + + async def fake_find_once( + selector_type:By, selector_value:str, timeout:float, *, parent:Element | None = None + ) -> Element: + nonlocal first_timeout, second_timeout + if selector_value == "first": + first_timeout = timeout + raise TimeoutError("first timeout") + second_timeout = timeout + return found + + with patch.object(web_scraper, "_web_find_once", side_effect = fake_find_once): + result, index = await web_scraper.web_find_first_available( + [(By.ID, "first"), (By.ID, "second")], + timeout = 2.0, + key = "login_detection", + ) + + assert result is found + assert index == 1 + assert first_timeout is not None + assert second_timeout is not None + assert first_timeout + second_timeout == pytest.approx(2.0) + + @pytest.mark.asyncio + async def test_web_find_first_available_exhausts_candidates_once_when_retry_disabled(self, web_scraper:WebScrapingMixin) -> None: + """Candidate exhaustion should not multiply attempts when retry is disabled.""" + web_scraper.config.timeouts.retry_enabled = False + + with ( + patch.object(web_scraper, "_web_find_once", side_effect = TimeoutError("not found")) as find_once, + pytest.raises(TimeoutError, match = "No HTML element found using selector group"), + ): + await web_scraper.web_find_first_available([(By.ID, "first"), (By.ID, "second")], timeout = 1.0) + + assert find_once.await_count == 2 + + @pytest.mark.asyncio + async def test_web_find_first_available_rejects_empty_selectors(self, web_scraper:WebScrapingMixin) -> None: + """Selector-group lookup should fail fast when no selectors are configured.""" + with pytest.raises(ValueError, match = "selectors must contain at least one selector"): + await web_scraper.web_find_first_available([]) + + @pytest.mark.asyncio + async def test_web_text_first_available_returns_text_and_index(self, web_scraper:WebScrapingMixin) -> None: + """Text-group helper should return extracted text and the matched selector index.""" + mock_element = AsyncMock(spec = Element) + mock_element.apply = AsyncMock(return_value = "dummy-user") + + with patch.object(web_scraper, "web_find_first_available", new_callable = AsyncMock, return_value = (mock_element, 1)): + text, index = await web_scraper.web_text_first_available([(By.ID, "a"), (By.ID, "b")], key = "login_detection") + + assert text == "dummy-user" + assert index == 1 + class TestSelectorTimeoutMessages: """Ensure selector helpers provide informative timeout messages.""" @@ -815,6 +918,21 @@ class TestWebScrolling: with pytest.raises(TimeoutError): await web_scraper.web_await(condition, timeout = 0.05) + @pytest.mark.asyncio + async def test_web_await_caps_sleep_to_remaining_timeout(self, web_scraper:WebScrapingMixin, mock_page:TrulyAwaitableMockPage) -> None: + """web_await should not sleep longer than the remaining timeout budget.""" + + async def condition() -> bool: + return False + + with pytest.raises(TimeoutError): + await web_scraper.web_await(condition, timeout = 0.2, apply_multiplier = False) + + sleep_mock = cast(AsyncMock, mock_page.sleep) + sleep_mock.assert_awaited() + slept_seconds = sleep_mock.await_args_list[0].args[0] + assert slept_seconds <= 0.2 + @pytest.mark.asyncio async def test_web_find_retry_mechanism(self, web_scraper:WebScrapingMixin, mock_page:TrulyAwaitableMockPage) -> None: """Test web_find retries until element is found within timeout."""