feat: collect timeout timing sessions for diagnostics (#814)

This commit is contained in:
Jens
2026-02-13 16:45:52 +01:00
committed by GitHub
parent 81c55316db
commit 50fc8781a9
12 changed files with 902 additions and 350 deletions

View File

@@ -0,0 +1,168 @@
# SPDX-FileCopyrightText: © Jens Bergmann and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""Collect per-operation timeout timings and persist per-run JSON sessions.
`TimingCollector` records operation durations in seconds, grouped by a single bot run
(`session_id`). Call `record(...)` during runtime and `flush()` once at command end to
append the current session to `timing_data.json` with automatic 30-day retention.
The collector is best-effort and designed for troubleshooting, not strict telemetry.
"""
from __future__ import annotations
import json, uuid # isort: skip
import os
from dataclasses import asdict, dataclass
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Final
if TYPE_CHECKING:
from pathlib import Path
from kleinanzeigen_bot.utils import loggers, misc
LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
RETENTION_DAYS:Final[int] = 30
TIMING_FILE:Final[str] = "timing_data.json"
@dataclass
class TimingRecord:
timestamp:str
operation_key:str
operation_type:str
description:str
configured_timeout_sec:float
effective_timeout_sec:float
actual_duration_sec:float
attempt_index:int
success:bool
def to_dict(self) -> dict[str, Any]:
return asdict(self)
class TimingCollector:
def __init__(self, output_dir:Path, command:str) -> None:
self.output_dir = output_dir.resolve()
self.command = command
self.session_id = uuid.uuid4().hex[:8]
self.started_at = misc.now().isoformat()
self.records:list[TimingRecord] = []
self._flushed = False
LOG.debug("Timing collection initialized (session=%s, output_dir=%s, command=%s)", self.session_id, self.output_dir, command)
def record(
self,
*,
key:str,
operation_type:str,
description:str,
configured_timeout:float,
effective_timeout:float,
actual_duration:float,
attempt_index:int,
success:bool,
) -> None:
self.records.append(
TimingRecord(
timestamp = misc.now().isoformat(),
operation_key = key,
operation_type = operation_type,
description = description,
configured_timeout_sec = configured_timeout,
effective_timeout_sec = effective_timeout,
actual_duration_sec = actual_duration,
attempt_index = attempt_index,
success = success,
)
)
LOG.debug(
"Timing captured: %s [%s] duration=%.3fs timeout=%.3fs success=%s",
operation_type,
key,
actual_duration,
effective_timeout,
success,
)
def flush(self) -> Path | None:
if self._flushed:
LOG.debug("Timing collection already flushed for this run")
return None
if not self.records:
LOG.debug("Timing collection enabled but no records captured in this run")
return None
try:
self.output_dir.mkdir(parents = True, exist_ok = True)
data = self._load_existing_sessions()
data.append(
{
"session_id": self.session_id,
"command": self.command,
"started_at": self.started_at,
"ended_at": misc.now().isoformat(),
"records": [record.to_dict() for record in self.records],
}
)
cutoff = misc.now() - timedelta(days = RETENTION_DAYS)
retained:list[dict[str, Any]] = []
dropped = 0
for session in data:
try:
parsed = misc.parse_datetime(session.get("started_at"), add_timezone_if_missing = True)
except ValueError:
parsed = None
if parsed is None:
dropped += 1
continue
if parsed >= cutoff:
retained.append(session)
else:
dropped += 1
if dropped > 0:
LOG.debug("Timing collection pruned %d old or malformed sessions", dropped)
output_file = self.output_dir / TIMING_FILE
temp_file = self.output_dir / f".{TIMING_FILE}.{self.session_id}.tmp"
with temp_file.open("w", encoding = "utf-8") as fd:
json.dump(retained, fd, indent = 2)
fd.write("\n")
fd.flush()
os.fsync(fd.fileno())
temp_file.replace(output_file)
LOG.debug(
"Timing collection flushed to %s (%d sessions, %d current records, retention=%d days)",
output_file,
len(retained),
len(self.records),
RETENTION_DAYS,
)
self.records = []
self._flushed = True
return output_file
except Exception as exc: # noqa: BLE001
LOG.warning("Failed to flush timing collection data: %s", exc)
return None
def _load_existing_sessions(self) -> list[dict[str, Any]]:
file_path = self.output_dir / TIMING_FILE
if not file_path.exists():
return []
try:
with file_path.open(encoding = "utf-8") as fd:
payload = json.load(fd)
if isinstance(payload, list):
return [item for item in payload if isinstance(item, dict)]
except Exception as exc: # noqa: BLE001
LOG.warning("Unable to load timing collection data from %s: %s", file_path, exc)
return []

View File

@@ -183,6 +183,36 @@ class WebScrapingMixin:
# Always perform the initial attempt plus the configured number of retries.
return 1 + cfg.retry_max_attempts
def _record_timing(
self,
*,
key:str,
description:str,
configured_timeout:float,
effective_timeout:float,
actual_duration:float,
attempt_index:int,
success:bool,
) -> None:
collector = getattr(self, "_timing_collector", None)
if collector is None:
return
operation_type = description.split("(", 1)[0] if "(" in description else description
try:
collector.record(
key = key,
operation_type = operation_type,
description = description,
configured_timeout = configured_timeout,
effective_timeout = effective_timeout,
actual_duration = actual_duration,
attempt_index = attempt_index,
success = success,
)
except Exception as exc: # noqa: BLE001
LOG.warning("Timing collector failed for key=%s operation=%s: %s", key, operation_type, exc)
async def _run_with_timeout_retries(
self, operation:Callable[[float], Awaitable[T]], *, description:str, key:str = "default", override:float | None = None
) -> T:
@@ -190,12 +220,34 @@ class WebScrapingMixin:
Execute an async callable with retry/backoff handling for TimeoutError.
"""
attempts = self._timeout_attempts()
configured_timeout = self._timeout(key, override)
loop = asyncio.get_running_loop()
for attempt in range(attempts):
effective_timeout = self._effective_timeout(key, override, attempt = attempt)
attempt_started = loop.time()
try:
return await operation(effective_timeout)
result = await operation(effective_timeout)
self._record_timing(
key = key,
description = description,
configured_timeout = configured_timeout,
effective_timeout = effective_timeout,
actual_duration = loop.time() - attempt_started,
attempt_index = attempt,
success = True,
)
return result
except TimeoutError:
self._record_timing(
key = key,
description = description,
configured_timeout = configured_timeout,
effective_timeout = effective_timeout,
actual_duration = loop.time() - attempt_started,
attempt_index = attempt,
success = False,
)
if attempt >= attempts - 1:
raise
LOG.debug("Retrying %s after TimeoutError (attempt %d/%d, timeout %.1fs)", description, attempt + 1, attempts, effective_timeout)