Skip to main content

fastmcp.server.tasks.notifications

Distributed notification queue for background task events (SEP-1686). Enables distributed Docket workers to send MCP notifications to clients without holding session references. Workers push to a Redis queue, the MCP server process subscribes and forwards to the client’s session. Pattern: Fire-and-forward with retry
  • One queue per session_id
  • LPUSH/BRPOP for reliable ordered delivery
  • Retry up to 3 times on delivery failure, then discard
  • TTL-based expiration for stale messages
Note: Docket’s execution.subscribe() handles task state/progress events via Redis Pub/Sub. This module handles elicitation-specific notifications that require reliable delivery (input_required prompts, cancel signals).

Functions

push_notification

push_notification(session_id: str, notification: dict[str, Any], docket: Docket) -> None
Push notification to session’s queue (called from Docket worker). Used for elicitation-specific notifications (input_required, cancel) that need reliable delivery across distributed processes. Args:
  • session_id: Target session’s identifier
  • notification: MCP notification dict (method, params, _meta)
  • docket: Docket instance for Redis access

notification_subscriber_loop

notification_subscriber_loop(session_id: str, session: ServerSession, docket: Docket, fastmcp: FastMCP) -> None
Subscribe to notification queue and forward to session. Runs in the MCP server process. Bridges distributed workers to clients. This loop:
  1. Maintains a heartbeat (active subscriber marker for debugging)
  2. Blocks on BRPOP waiting for notifications
  3. Forwards notifications to the client’s session
  4. Retries failed deliveries, then discards (no dead-letter queue)
Args:
  • session_id: Session identifier to subscribe to
  • session: MCP ServerSession for sending notifications
  • docket: Docket instance for Redis access
  • fastmcp: FastMCP server instance (for elicitation relay)

ensure_subscriber_running

ensure_subscriber_running(session_id: str, session: ServerSession, docket: Docket, fastmcp: FastMCP) -> None
Start notification subscriber if not already running (idempotent). Subscriber is created on first task submission and cleaned up on disconnect. Safe to call multiple times for the same session. Args:
  • session_id: Session identifier
  • session: MCP ServerSession
  • docket: Docket instance
  • fastmcp: FastMCP server instance (for elicitation relay)

stop_subscriber

stop_subscriber(session_id: str) -> None
Stop notification subscriber for a session. Called when session disconnects. Pending messages remain in queue for delivery if client reconnects (with TTL expiration). Args:
  • session_id: Session identifier

get_subscriber_count

get_subscriber_count() -> int
Get number of active subscribers (for monitoring).