Source code for polymarket_watcher.service

"""Service orchestrator — wires configuration, market resolution, watchers,
and the WebSocket client into a single runnable unit.
"""

from __future__ import annotations

import logging
from decimal import Decimal
from typing import Any

from .actions.base_action import BaseAction
from .actions.log_action import LogAction
from .config import Config
from .market_resolver import get_token_ids_for_slug
from .position_fetcher import Position, fetch_positions
from .watchers.base_watcher import BaseWatcher
from .watchers.bid_floor_watcher import BidFloorWatcher
from .watchers.value_watcher import ValueWatcher
from .websocket_client import PolymarketWebSocketClient

logger = logging.getLogger(__name__)


[docs] class WatcherService: """Top-level service that owns the watcher registry and event dispatch. Extending the service --------------------- * To add a new watcher, instantiate it inside ``_build_watchers`` and append it to the returned list. * To add a new action, instantiate it and include it in the ``actions`` list passed to each watcher that should use it. """ def __init__(self, config: Config) -> None: self._config = config self._watchers: list[BaseWatcher] = [] # ------------------------------------------------------------------ # Public interface # ------------------------------------------------------------------
[docs] async def run(self) -> None: """Resolve positions, build watchers, and start the event loop.""" cfg = self._config actions = [LogAction()] if cfg.actions.log_enabled else [] if cfg.account.proxy_wallet: # ── Primary path: auto-discover positions from the wallet ── logger.info( "Fetching positions for proxy wallet %s…", cfg.account.proxy_wallet, ) positions = fetch_positions(cfg.account.proxy_wallet) if not positions: logger.warning( "No open positions found for wallet %s. Nothing to watch.", cfg.account.proxy_wallet, ) return asset_ids = [p.asset_id for p in positions] self._watchers = self._build_watchers_for_positions( cfg, positions, actions ) else: # ── Fallback: single-market manual config ────────────────── logger.warning( "No proxy_wallet configured — falling back to manual market config." ) slug = cfg.market.slug direction = cfg.market.direction logger.info("Resolving token IDs for slug %r…", slug) yes_token_id, no_token_id = get_token_ids_for_slug(slug) asset_ids = [yes_token_id, no_token_id] self._watchers = self._build_watchers_for_manual( cfg, direction, yes_token_id, no_token_id, actions ) client = PolymarketWebSocketClient( asset_ids=asset_ids, on_event=self._dispatch_event, reconnect_delay=cfg.service.reconnect_delay_sec, ) await client.run()
# ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _build_watchers_for_positions( self, cfg: Config, positions: list[Position], actions: list[BaseAction], ) -> list[BaseWatcher]: """Build BidFloorWatcher + ValueWatcher for every open position.""" watchers: list[BaseWatcher] = [] bf_cfg = cfg.watcher.bid_floor val_cfg = cfg.watcher.value for pos in positions: logger.info( "Setting up watchers for %r (%s) — size=%s avg_price=%s", pos.slug or pos.asset_id[:12], pos.direction, pos.size, pos.avg_price, ) if bf_cfg.enabled: watchers.append( BidFloorWatcher( asset_id=pos.asset_id, slug=pos.slug, direction=pos.direction, entry_price=pos.avg_price, position_size=pos.size, safety_multiple=bf_cfg.safety_multiple, floor_window_pct=bf_cfg.floor_window_pct, actions=actions, ) ) if val_cfg.enabled: watchers.append( ValueWatcher( asset_id=pos.asset_id, slug=pos.slug, direction=pos.direction, entry_cost=pos.entry_cost, position_size=pos.size, avg_price=pos.avg_price, alert_thresholds=val_cfg.alert_thresholds, actions=actions, ) ) return watchers def _build_watchers_for_manual( self, cfg: Config, direction: str, yes_token_id: str, no_token_id: str, actions: list[BaseAction], ) -> list[BaseWatcher]: """Build watchers from the manual market config (fallback path).""" watchers: list[BaseWatcher] = [] # direction is already lowercased by MarketConfig.__post_init__; normalise # here defensively so alert payloads are consistent with the auto-discovery # path which also uses lowercase outcome labels. direction = direction.lower() asset_id = yes_token_id if direction in ("yes", "long") else no_token_id slug = cfg.market.slug bf_cfg = cfg.watcher.bid_floor val_cfg = cfg.watcher.value entry_price = Decimal(str(cfg.market.entry_price)) position_size = Decimal(str(cfg.market.position_size)) entry_cost = entry_price * position_size # ── Bid-floor watcher ────────────────────────────────────────── if bf_cfg.enabled and position_size > Decimal("0"): logger.info( "Enabling BidFloorWatcher for direction '%s', asset %s…", direction, asset_id[:12], ) watchers.append( BidFloorWatcher( asset_id=asset_id, slug=slug, direction=direction, entry_price=entry_price, position_size=position_size, safety_multiple=bf_cfg.safety_multiple, floor_window_pct=bf_cfg.floor_window_pct, actions=actions, ) ) # ── Value watcher ────────────────────────────────────────────── if val_cfg.enabled and entry_cost > Decimal("0"): logger.info( "Enabling ValueWatcher for direction '%s', asset %s…", direction, asset_id[:12], ) watchers.append( ValueWatcher( asset_id=asset_id, slug=slug, direction=direction, entry_cost=entry_cost, position_size=position_size, avg_price=entry_price, alert_thresholds=val_cfg.alert_thresholds, actions=actions, ) ) return watchers async def _dispatch_event(self, event: dict[str, Any]) -> None: """Fan out one WebSocket event to every interested watcher.""" event_type = event.get("event_type") for watcher in self._watchers: supported = watcher.supported_event_types if supported is not None and event_type not in supported: continue try: watcher.on_event(event) except Exception: # noqa: BLE001 logger.exception( "Watcher %s raised an unhandled exception.", watcher.name )