state_store.py 6.38 KB
from __future__ import annotations

import json
import threading
import time
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from uuid import uuid4

from loguru import logger

from .catalog import PLATFORM_LABELS
from utils.runtime_paths import LOGS_DIR

MAX_HISTORY_ITEMS = 30
PROJECT_ROOT = Path(__file__).resolve().parents[2]
UI_STATE_PATH = LOGS_DIR / "crawler" / "ui_state.json"


def now_iso() -> str:
    return datetime.now().isoformat(timespec="seconds")


def normalize_bool(value: Any, default: bool = False) -> bool:
    if value is None:
        return default
    if isinstance(value, bool):
        return value
    return str(value).strip().lower() in {"1", "true", "t", "yes", "y", "on"}


def normalize_text(value: Any) -> str:
    if value is None:
        return ""
    return str(value).strip()


def clip_text(value: Any, limit: int = 120) -> str:
    text = normalize_text(value)
    if len(text) <= limit:
        return text
    return f"{text[: limit - 1]}…"


def mask_phone(value: Any) -> str:
    phone = normalize_text(value)
    if len(phone) < 7:
        return phone
    return f"{phone[:3]}****{phone[-4:]}"


def build_history_id(prefix: str) -> str:
    return f"{prefix}_{int(time.time() * 1000)}_{uuid4().hex[:8]}"


def sanitize_login_config(
    *,
    platform: str,
    login_type: str,
    headless: bool,
    cookies: str = "",
    phone: str = "",
) -> Dict[str, Any]:
    sanitized: Dict[str, Any] = {
        "platform": platform,
        "login_type": login_type,
        "headless": headless,
        "cookie_provided": bool(normalize_text(cookies)),
    }
    phone_hint = mask_phone(phone)
    if phone_hint:
        sanitized["phone_hint"] = phone_hint
    return sanitized


def sanitize_crawl_config(payload: Dict[str, Any]) -> Dict[str, Any]:
    sanitized: Dict[str, Any] = {
        "platform": normalize_text(payload.get("platform")),
        "login_type": normalize_text(payload.get("login_type") or "qrcode"),
        "crawler_type": normalize_text(payload.get("crawler_type") or "search"),
        "save_option": normalize_text(payload.get("save_option") or ""),
        "headless": normalize_bool(payload.get("headless"), True),
        "enable_comments": normalize_bool(payload.get("enable_comments"), True),
        "enable_sub_comments": normalize_bool(payload.get("enable_sub_comments"), False),
        "start_page": int(payload.get("start_page") or 1),
        "max_notes": int(payload.get("max_notes") or 20),
        "max_comments": int(payload.get("max_comments") or 20),
    }
    keywords = normalize_text(payload.get("keywords"))
    specified_ids = normalize_text(payload.get("specified_ids"))
    creator_ids = normalize_text(payload.get("creator_ids"))
    if keywords:
        sanitized["keywords"] = keywords
    if specified_ids:
        sanitized["specified_ids"] = specified_ids
    if creator_ids:
        sanitized["creator_ids"] = creator_ids
    phone_hint = mask_phone(payload.get("phone"))
    if phone_hint:
        sanitized["phone_hint"] = phone_hint
    sanitized["cookie_provided"] = bool(normalize_text(payload.get("cookies")))
    return sanitized


def default_ui_state() -> Dict[str, Any]:
    return {
        "last_login_configs": {},
        "last_crawl_configs": {},
        "history": [],
    }


class CrawlerUiStateStore:
    def __init__(self, path: Path):
        self._path = path
        self._lock = threading.Lock()
        self._path.parent.mkdir(parents=True, exist_ok=True)
        self._state = self._load()

    def _load(self) -> Dict[str, Any]:
        if not self._path.exists():
            return default_ui_state()
        try:
            data = json.loads(self._path.read_text(encoding="utf-8"))
        except Exception as exc:
            logger.warning(f"Failed to load crawler ui state: {exc}")
            return default_ui_state()
        state = default_ui_state()
        if isinstance(data, dict):
            state["last_login_configs"] = data.get("last_login_configs") or {}
            state["last_crawl_configs"] = data.get("last_crawl_configs") or {}
            state["history"] = data.get("history") or []
        return state

    def _save(self) -> None:
        self._path.write_text(
            json.dumps(self._state, ensure_ascii=False, indent=2),
            encoding="utf-8",
        )

    def snapshot(self) -> Dict[str, Any]:
        with self._lock:
            return deepcopy(self._state)

    def save_last_login_config(self, platform: str, config: Dict[str, Any]) -> None:
        with self._lock:
            self._state["last_login_configs"][platform] = deepcopy(config)
            self._save()

    def save_last_crawl_config(self, platform: str, config: Dict[str, Any]) -> None:
        with self._lock:
            self._state["last_crawl_configs"][platform] = deepcopy(config)
            self._save()

    def create_history_entry(
        self,
        *,
        kind: str,
        platform: str,
        status: str,
        message: str,
        config: Optional[Dict[str, Any]] = None,
        extra: Optional[Dict[str, Any]] = None,
    ) -> str:
        entry_id = build_history_id(kind)
        entry: Dict[str, Any] = {
            "id": entry_id,
            "kind": kind,
            "platform": platform,
            "platform_label": PLATFORM_LABELS.get(platform, platform),
            "status": status,
            "message": message,
            "config": deepcopy(config or {}),
            "created_at": now_iso(),
            "updated_at": now_iso(),
        }
        if extra:
            entry.update(deepcopy(extra))

        with self._lock:
            self._state["history"].insert(0, entry)
            self._state["history"] = self._state["history"][:MAX_HISTORY_ITEMS]
            self._save()
        return entry_id

    def update_history_entry(self, entry_id: Optional[str], **updates: Any) -> None:
        if not entry_id:
            return
        with self._lock:
            for entry in self._state["history"]:
                if entry.get("id") != entry_id:
                    continue
                entry.update(deepcopy(updates))
                entry["updated_at"] = now_iso()
                if entry.get("status") in {"logged_in", "logged_out", "cancelled", "idle", "error", "completed"}:
                    entry["finished_at"] = now_iso()
                self._save()
                return


ui_state_store = CrawlerUiStateStore(UI_STATE_PATH)