fix: detect payment form and wait or user input (#520)

Co-authored-by: Jens Bergmann <1742418+1cu@users.noreply.github.com>
This commit is contained in:
Heavenfighter
2025-06-10 15:51:59 +02:00
committed by GitHub
parent a5603e742f
commit 4d48427234
4 changed files with 91 additions and 37 deletions

View File

@@ -656,7 +656,6 @@ class KleinanzeigenBot(WebScrapingMixin):
@param ad_cfg_orig: the ad config as present in the YAML file @param ad_cfg_orig: the ad config as present in the YAML file
@param published_ads: json list of published ads @param published_ads: json list of published ads
""" """
await self.assert_free_ad_limit_not_reached()
if self.config.publishing.delete_old_ads == "BEFORE_PUBLISH" and not self.keep_old_ads: if self.config.publishing.delete_old_ads == "BEFORE_PUBLISH" and not self.keep_old_ads:
await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config.publishing.delete_old_ads_by_title) await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config.publishing.delete_old_ads_by_title)
@@ -811,6 +810,20 @@ class KleinanzeigenBot(WebScrapingMixin):
except TimeoutError: except TimeoutError:
pass # nosec pass # nosec
#############################
# wait for payment form if commercial account is used
#############################
try:
await self.web_find(By.ID, "myftr-shppngcrt-frm", timeout = 2)
LOG.warning("############################################")
LOG.warning("# Payment form detected! Please proceed with payment.")
LOG.warning("############################################")
await self.web_scroll_page_down()
input(_("Press a key to continue..."))
except TimeoutError:
pass
await self.web_await(lambda: "p-anzeige-aufgeben-bestaetigung.html?adId=" in self.page.url, timeout = 20) await self.web_await(lambda: "p-anzeige-aufgeben-bestaetigung.html?adId=" in self.page.url, timeout = 20)
# extract the ad id from the URL's query parameter # extract the ad id from the URL's query parameter
@@ -1033,13 +1046,6 @@ class KleinanzeigenBot(WebScrapingMixin):
await image_upload.send_file(image) await image_upload.send_file(image)
await self.web_sleep() await self.web_sleep()
async def assert_free_ad_limit_not_reached(self) -> None:
try:
await self.web_find(By.XPATH, "/html/body/div[1]/form/fieldset[6]/div[1]/header", timeout = 2)
raise AssertionError(f"Cannot publish more ads. The monthly limit of free ads of account {self.config.login.username} is reached.")
except TimeoutError:
pass
async def download_ads(self) -> None: async def download_ads(self) -> None:
""" """
Determines which download mode was chosen with the arguments, and calls the specified download routine. Determines which download mode was chosen with the arguments, and calls the specified download routine.

View File

@@ -112,6 +112,7 @@ class AdExtractor(WebScrapingMixin):
:param url: the URL to the ad page :param url: the URL to the ad page
:return: the ad ID, a (ten-digit) integer number :return: the ad ID, a (ten-digit) integer number
""" """
num_part = url.rsplit("/", maxsplit = 1)[-1] # suffix num_part = url.rsplit("/", maxsplit = 1)[-1] # suffix
id_part = num_part.split("-", maxsplit = 1)[0] id_part = num_part.split("-", maxsplit = 1)[0]

View File

@@ -87,9 +87,11 @@ kleinanzeigen_bot/__init__.py:
publish_ad: publish_ad:
"Publishing ad '%s'...": "Veröffentliche Anzeige '%s'..." "Publishing ad '%s'...": "Veröffentliche Anzeige '%s'..."
"Failed to set shipping attribute for type '%s'!": "Fehler beim setzen des Versandattributs für den Typ '%s'!" "Failed to set shipping attribute for type '%s'!": "Fehler beim setzen des Versandattributs für den Typ '%s'!"
"# Payment form detected! Please proceed with payment.": "# Bestellformular gefunden! Bitte mit der Bezahlung fortfahren."
" -> SUCCESS: ad published with ID %s": " -> ERFOLG: Anzeige mit ID %s veröffentlicht" " -> SUCCESS: ad published with ID %s": " -> ERFOLG: Anzeige mit ID %s veröffentlicht"
" -> effective ad meta:": " -> effektive Anzeigen-Metadaten:" " -> effective ad meta:": " -> effektive Anzeigen-Metadaten:"
"Could not set city from location": "Stadt konnte nicht aus dem Standort gesetzt werden" "Could not set city from location": "Stadt konnte nicht aus dem Standort gesetzt werden"
"Press a key to continue...": "Eine Taste drücken, um fortzufahren..."
__set_condition: __set_condition:
"Unable to close condition dialog!": "Kann den Dialog für Artikelzustand nicht schließen!" "Unable to close condition dialog!": "Kann den Dialog für Artikelzustand nicht schließen!"

View File

@@ -17,6 +17,7 @@ from kleinanzeigen_bot._version import __version__
from kleinanzeigen_bot.model.ad_model import Ad from kleinanzeigen_bot.model.ad_model import Ad
from kleinanzeigen_bot.model.config_model import AdDefaults, Config, PublishingConfig from kleinanzeigen_bot.model.config_model import AdDefaults, Config, PublishingConfig
from kleinanzeigen_bot.utils import dicts, loggers from kleinanzeigen_bot.utils import dicts, loggers
from kleinanzeigen_bot.utils.web_scraping_mixin import By, Element
@pytest.fixture @pytest.fixture
@@ -38,6 +39,26 @@ def mock_page() -> MagicMock:
return mock return mock
@pytest.mark.asyncio
async def test_mock_page_fixture(mock_page:MagicMock) -> None:
"""Test that the mock_page fixture is properly configured."""
# Test that all required async methods are present
assert hasattr(mock_page, "sleep")
assert hasattr(mock_page, "evaluate")
assert hasattr(mock_page, "click")
assert hasattr(mock_page, "type")
assert hasattr(mock_page, "select")
assert hasattr(mock_page, "wait_for_selector")
assert hasattr(mock_page, "wait_for_navigation")
assert hasattr(mock_page, "wait_for_load_state")
assert hasattr(mock_page, "content")
assert hasattr(mock_page, "goto")
assert hasattr(mock_page, "close")
# Test that content returns expected value
assert await mock_page.content() == "<html></html>"
@pytest.fixture @pytest.fixture
def base_ad_config() -> dict[str, Any]: def base_ad_config() -> dict[str, Any]:
"""Provide a base ad configuration that can be used across tests.""" """Provide a base ad configuration that can be used across tests."""
@@ -81,7 +102,7 @@ def remove_fields(config:dict[str, Any], *fields:str) -> dict[str, Any]:
for field in fields: for field in fields:
if "." in field: if "." in field:
# Handle nested fields (e.g., "contact.phone") # Handle nested fields (e.g., "contact.phone")
parts = field.split(".") parts = field.split(".", maxsplit = 1)
current = result current = result
for part in parts[:-1]: for part in parts[:-1]:
if part in current: if part in current:
@@ -93,6 +114,30 @@ def remove_fields(config:dict[str, Any], *fields:str) -> dict[str, Any]:
return result return result
def test_remove_fields() -> None:
"""Test the remove_fields helper function."""
test_config = {
"field1": "value1",
"field2": "value2",
"nested": {
"field3": "value3"
}
}
# Test removing top-level field
result = remove_fields(test_config, "field1")
assert "field1" not in result
assert "field2" in result
# Test removing nested field
result = remove_fields(test_config, "nested.field3")
assert "field3" not in result["nested"]
# Test removing non-existent field
result = remove_fields(test_config, "nonexistent")
assert result == test_config
@pytest.fixture @pytest.fixture
def minimal_ad_config(base_ad_config:dict[str, Any]) -> dict[str, Any]: def minimal_ad_config(base_ad_config:dict[str, Any]) -> dict[str, Any]:
"""Provide a minimal ad configuration with only required fields.""" """Provide a minimal ad configuration with only required fields."""
@@ -281,20 +326,6 @@ login:
class TestKleinanzeigenBotAuthentication: class TestKleinanzeigenBotAuthentication:
"""Tests for login and authentication functionality.""" """Tests for login and authentication functionality."""
@pytest.mark.asyncio
async def test_assert_free_ad_limit_not_reached_success(self, test_bot:KleinanzeigenBot) -> None:
"""Verify that free ad limit check succeeds when limit not reached."""
with patch.object(test_bot, "web_find", side_effect = TimeoutError):
await test_bot.assert_free_ad_limit_not_reached()
@pytest.mark.asyncio
async def test_assert_free_ad_limit_not_reached_limit_reached(self, test_bot:KleinanzeigenBot) -> None:
"""Verify that free ad limit check fails when limit is reached."""
with patch.object(test_bot, "web_find", return_value = AsyncMock()):
with pytest.raises(AssertionError) as exc_info:
await test_bot.assert_free_ad_limit_not_reached()
assert "Cannot publish more ads" in str(exc_info.value)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_is_logged_in_returns_true_when_logged_in(self, test_bot:KleinanzeigenBot) -> None: async def test_is_logged_in_returns_true_when_logged_in(self, test_bot:KleinanzeigenBot) -> None:
"""Verify that login check returns true when logged in.""" """Verify that login check returns true when logged in."""
@@ -988,6 +1019,19 @@ class TestKleinanzeigenBotShippingOptions:
return 0 # Return integer to prevent scrolling loop return 0 # Return integer to prevent scrolling loop
return None return None
# Create mock elements
csrf_token_elem = MagicMock()
csrf_token_elem.attrs = {"content": "csrf-token-123"}
shipping_form_elem = MagicMock()
shipping_form_elem.attrs = {}
shipping_size_radio = MagicMock()
shipping_size_radio.attrs = {"checked": False}
category_path_elem = MagicMock()
category_path_elem.apply = AsyncMock(return_value = "Test Category")
# Mock the necessary web interaction methods # Mock the necessary web interaction methods
with patch.object(test_bot, "web_execute", side_effect = mock_web_execute), \ with patch.object(test_bot, "web_execute", side_effect = mock_web_execute), \
patch.object(test_bot, "web_click", new_callable = AsyncMock), \ patch.object(test_bot, "web_click", new_callable = AsyncMock), \
@@ -998,23 +1042,24 @@ class TestKleinanzeigenBotShippingOptions:
patch.object(test_bot, "web_sleep", new_callable = AsyncMock), \ patch.object(test_bot, "web_sleep", new_callable = AsyncMock), \
patch.object(test_bot, "web_check", new_callable = AsyncMock, return_value = True), \ patch.object(test_bot, "web_check", new_callable = AsyncMock, return_value = True), \
patch.object(test_bot, "web_request", new_callable = AsyncMock), \ patch.object(test_bot, "web_request", new_callable = AsyncMock), \
patch.object(test_bot, "web_find_all", new_callable = AsyncMock) as mock_find_all, \ patch.object(test_bot, "web_find_all", new_callable = AsyncMock), \
patch.object(test_bot, "web_await", new_callable = AsyncMock), \ patch.object(test_bot, "web_await", new_callable = AsyncMock), \
patch("builtins.input", return_value = ""): # Mock the input function patch("builtins.input", return_value = ""), \
patch.object(test_bot, "web_scroll_page_down", new_callable = AsyncMock):
# Mock the shipping options form elements # Mock web_find to simulate element detection
mock_find.side_effect = [ async def mock_find_side_effect(selector_type:By, selector_value:str, **_:Any) -> Element | None:
TimeoutError(), # First call in assert_free_ad_limit_not_reached if selector_value == "meta[name=_csrf]":
AsyncMock(attrs = {"content": "csrf-token-123"}), # CSRF token return csrf_token_elem
AsyncMock(attrs = {"checked": True}), # Size radio button check if selector_value == "myftr-shppngcrt-frm":
AsyncMock(attrs = {"value": "Klein"}), # Size dropdown return shipping_form_elem
AsyncMock(attrs = {"value": "Paket 2 kg"}), # Package type dropdown if selector_value == '.SingleSelectionItem--Main input[type=radio][data-testid="Klein"]':
AsyncMock(attrs = {"value": "Päckchen"}), # Second package type dropdown return shipping_size_radio
TimeoutError(), # Captcha check if selector_value == "postad-category-path":
] return category_path_elem
return None
# Mock web_find_all to return empty list for city options mock_find.side_effect = mock_find_side_effect
mock_find_all.return_value = []
# Mock web_check to return True for radio button checked state # Mock web_check to return True for radio button checked state
with patch.object(test_bot, "web_check", new_callable = AsyncMock) as mock_check: with patch.object(test_bot, "web_check", new_callable = AsyncMock) as mock_check: