Skip to main content

Documentation Index

Fetch the complete documentation index at: https://developers.autoplay.ai/llms.txt

Use this file to discover all available pages before exploring further.

Auto-generated from autoplay_sdk/skills/. Edit the source SKILL.md and run python scripts/sync_skill_docs.py.
Download .md · Open raw file · Install via CLI: autoplay-install-skills

Autoplay Core

Install

pip install autoplay-sdk
# Optional: one-line FastAPI bridge
# pip install "autoplay-sdk[serve]"
# Plus your preferred LLM client, e.g.:
#   pip install openai        # OpenAI / Azure OpenAI
#   pip install anthropic     # Anthropic Claude
#   pip install google-generativeai  # Gemini
Upgrading an existing integration? Install the migration helper skill and ask your agent to rewrite deprecated imports:
autoplay-install-skills --migrate
Credentials come from the Autoplay dashboard after running onboard_product:
  • CONNECTOR_URL — stream URL: https://your-connector.onrender.com/stream/YOUR_PRODUCT_ID
  • API_TOKEN — Bearer token (Unkey key)

⚠️ Critical: Session scoping & conversation scoping

This is the most important concept. Get this wrong and context will be empty, mixed between users, or silently dropped. AutoplayChatbotManager handles all session lifecycle automatically. Use it unless you need custom Redis-backed stores or non-standard delivery logic.
from autoplay_sdk import AutoplayChatbotManager, BaseChatbotWriter

# 1. Implement _post_note — the only platform-specific code
class MyWriter(BaseChatbotWriter):
    async def _post_note(self, conversation_id: str, text: str) -> None:
        # call your chatbot platform's API here
        ...

# 2. Initialize once at server startup
manager = AutoplayChatbotManager(writer=MyWriter())

# 3. Autoplay stream handler — identical for every platform
async def on_actions(payload):
    await manager.on_actions(payload)

# 4. Chatbot webhook handler — only the extraction lines differ per platform
async def on_webhook(data):
    session_id = data["custom_attributes"]["session_id"]  # ← platform-specific
    conversation_id = data["id"]                          # ← platform-specific
    await manager.on_chatbot_event(session_id, conversation_id)
That’s it. The manager handles:
  • Creating SessionState the moment a session_id is first seen (PostHog is the ground truth)
  • Buffering actions before a conversation is linked
  • Auto-detecting NEW vs REPLY_EXISTING from the link store
  • Flushing buffered actions the moment the link is established
  • Routing all future delivery to conversation_id permanently
session_id comes from PostHog. conversation_id comes from the chatbot platform. They are married together by on_chatbot_event. After that call, all proactive triggers, reactive notes, and future actions route to that conversation for the lifetime of the session.

Low-level building blocks (advanced use)

Use these directly only when you need custom Redis-backed stores or non-standard delivery.
from autoplay_sdk import (
    InMemorySessionStateStore,
    InMemoryConversationLinkStore,
    link_conversation,
)
from autoplay_sdk.agent_state.v2 import SessionState

# Initialize stores at server startup
session_store = InMemorySessionStateStore()
link_store = InMemoryConversationLinkStore()

# Phase 1 — Autoplay stream: SessionState born here, session_id is the ground truth
async def on_actions(payload):
    if not payload.session_id:
        return
    state = await session_store.get_or_create(payload.session_id)  # ← first, always
    await writer.write_actions(...)

# Phase 2 — Chatbot webhook: look up existing state, add conversation_id
async def on_webhook(data):
    session_id = data["custom_attributes"]["session_id"]  # platform-specific
    conversation_id = data["id"]                          # platform-specific
    state = await session_store.get_or_create(session_id)
    link_conversation(state=state, store=link_store, conversation_id=conversation_id)
    await writer.on_session_linked(session_id, conversation_id)
SessionState fields:
  • state.session_id — mandatory required field; SessionState(session_id=...) — construction without it is a TypeError
  • state.metadata — optional extras (user_id, email, any future fields)
  • state.conversation_linked / state.conversation_id — linked chatbot conversation
  • state.current_state — FSM (THINKING / PROACTIVE / REACTIVE)
Production: swap to Redis-backed store InMemorySessionStateStore loses all state on restart. For production, use RedisSessionStateStore — sessions survive restarts and scale across instances. It is a drop-in replacement:
# Install the redis extra
# pip install "autoplay-sdk[redis]"

from autoplay_sdk import AutoplayChatbotManager, RedisSessionStateStore

# One-line swap at server startup
manager = AutoplayChatbotManager(
    writer=MyWriter(),
    session_store=RedisSessionStateStore(redis_url=os.environ["REDIS_URL"]),
)
Key pattern: autoplay:session_state:{session_id}. Default TTL: 24 h (configurable via ttl_s). On Redis errors, falls back gracefully and logs a warning — never crashes. What breaks without correct scoping:
  • Wrong user’s events delivered to wrong conversation
  • Empty context because retrieval key doesn’t match storage key
  • Pre-link actions silently dropped

import asyncio
from autoplay_sdk import AsyncConnectorClient, compose_chat_pipeline
from autoplay_sdk.context.user_index import UserSessionIndex

CONNECTOR_URL = "https://your-connector.onrender.com/stream/YOUR_PRODUCT_ID"
API_TOKEN = "your-api-token"

async def llm(prompt: str) -> str:
    """Wire your preferred LLM here.

    The SDK only needs an async callable: (str) -> str.
    Any provider works — OpenAI, Anthropic, Gemini, Mistral, a local model, etc.

    OpenAI example:
        from openai import AsyncOpenAI
        _client = AsyncOpenAI()
        r = await _client.chat.completions.create(model="gpt-4o-mini", messages=[{"role":"user","content":prompt}])
        return r.choices[0].message.content
    """
    raise NotImplementedError("Replace with your LLM client")

# Compose summarizer + context store + writer safely (no callback clobbering)
pipeline = compose_chat_pipeline(
    llm=llm,
    threshold=20,
    lookback_seconds=300,
    max_actions=20,
    write_actions=None,           # optional push callback for chatbot notes
    overwrite_with_summary=None,  # optional summary overwrite callback
)
user_index = UserSessionIndex(pipeline.context_store, lookback_seconds=300)

async def run():
    async with AsyncConnectorClient(url=CONNECTOR_URL, token=API_TOKEN) as client:
        async def on_actions(payload):
            await pipeline.on_actions(payload)
            user_index.add(payload)

        client.on_actions(on_actions)
        await client.run()

asyncio.run(run())
If you skip compose_chat_pipeline, summaries are never produced. The on_summary / overwrite_with_summary callback requires the pipeline’s summarizer to be wired into the stream. There is no implicit fallback — omitting this call means overwrite_with_summary never fires, regardless of the threshold value.
When your chatbot gets a user_id, call:
activity = user_index.get_user_activity(user_id)
This avoids hand-rolled user_id -> session_id indexing and keeps product_id-aware lookups correct.

Analytics-only recipe (no chatbot)

If you only need to receive enriched events and forward them to your own pipeline — no chatbot, no notes, no session linking — you need none of the chatbot-shaped machinery above. Just the client:
import asyncio
from autoplay_sdk import AsyncConnectorClient

CONNECTOR_URL = "https://your-connector.onrender.com/stream/YOUR_PRODUCT_ID"
API_TOKEN = "your-api-token"

async def run():
    async with AsyncConnectorClient(url=CONNECTOR_URL, token=API_TOKEN) as client:
        async def on_actions(payload):
            if not payload.session_id:
                return
            # payload.slim_actions  — list of enriched action dicts
            # payload.user_id       — set if posthog.identify() was called
            # payload.email         — set if posthog.identify() was called
            your_downstream(payload)  # push to PostHog, Segment, your DB, etc.

        client.on_actions(on_actions)
        await client.run()

asyncio.run(run())
No BaseChatbotWriter, no compose_chat_pipeline, no session store required.

Optional one-line HTTP bridge

If your chatbot runtime is in another process (Rasa/Botpress/Twilio/custom webhook), prefer the built-in FastAPI factory:
from autoplay_sdk.api import build_copilot_app

app = build_copilot_app(
    stream_url=CONNECTOR_URL,
    token=API_TOKEN,
    llm=llm,
    summary_threshold=20,
    lookback_seconds=300,
)
Default endpoints:
  • GET /healthz
  • GET /context/{user_id}?query=...
  • GET /reply/{user_id}?query=...
  • POST /admin/reset/{user_id}

LLM guardrails

Use versioned prompt metadata with your provider client. The SDK does not currently ship a call_llm(...) helper.
# GOOD — use prompt metadata alongside your provider call
MY_PROMPT = {
    "name": "Support Answer Prompt",
    "version": "0.1",
    "description": "Answer user questions with product-aware context",
    "content": "You are a helpful assistant...\n\n{context}",
}

messages = [{"role": "system", "content": MY_PROMPT["content"].format(context=context_text)}]
response = await openai_client.chat.completions.create(
    model="gpt-4o-mini",
    messages=messages,
)

# BAD — inline prompt string with no version metadata
response = await openai_client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "system", "content": "You are a helpful assistant..."}],
)
Prompt files live in src/llm/ only. Each exports:
MY_PROMPT = {
    "name": "My Prompt",
    "version": "0.1",
    "description": "...",
    "content": "...",
}
Log: prompt name + version, model, token usage, latency, errors. Never log full session payloads.

Integration Self-Reasoning Checklist

Run through this checklist before considering any chatbot integration complete. Reason through each question — do not skip.

1. Session identity

  • Is session_id the first thing set in SessionState? state = SessionState(session_id=...)SessionState() without session_id is a TypeError.
  • Is PostHog the ground truth for session_id? It must come from ActionsPayload.session_id, never from the chatbot platform.
  • Is await session_store.get_or_create(session_id) the very first await in every handler — before any processing, delivery, or API call?
  • Does state.metadata carry all optional identity context (user_id, email, etc.)? Nothing important should live outside SessionState.

2. Race conditions

  • Webhook fires before stream: get_or_create handles this — it creates the state with session_id set. Actions that arrive later from the stream route correctly because the state already exists.
  • Stream fires before webhook: Actions buffer in BaseChatbotWriter._pending until on_session_linked is called. Nothing is dropped.
  • Concurrent webhooks for the same session: InMemorySessionStateStore.get_or_create is a synchronous dict operation — race-safe. RedisSessionStateStore does a GET-before-SET — acceptable for session context (last writer wins on the rare collision).
  • Session state missing after restart: If using InMemorySessionStateStore in production, state is lost. Switch to RedisSessionStateStore.

3. Chatbot linking

  • Is link_conversation(state=state, store=store, conversation_id=conv_id) called with no event= argument? Let it auto-detect NEW vs REPLY_EXISTING from the store.
  • Is await writer.on_session_linked(session_id, conversation_id) called immediately after link_conversation? This is the flush trigger — without it, buffered actions are never delivered.
  • Is the link sticky? Once state.conversation_linked == True, the same session must not be re-linked to a different conversation_id. The REPLY_EXISTING semantics handle this automatically — but verify the webhook handler is not calling link_conversation again on subsequent replies.
  • If using RedisSessionStateStore: is await session_store.save(state) called after linking? AutoplayChatbotManager.on_chatbot_event does this automatically — verify if using the low-level path.

4. Proactive triggers

  • Are proactive triggers reading state.conversation_id as the delivery target?
  • Is there a guard: if not state.conversation_linked: return before calling the chatbot API? A trigger that fires before the conversation is linked has nowhere to deliver.
  • Does the FSM gate the trigger correctly? Call state.tick() first, then allowed, reason = state.can_deliver_proactive(). This is the canonical check — it verifies both THINKING state and cooldown simultaneously. Do not replicate this logic inline.
  • After delivering a proactive trigger, is the FSM transitioned? state.transition_to_proactive(trigger_id) must be called or the session stays in THINKING and the trigger may re-fire.
  • After any FSM transition (to_proactive, to_reactive, tick, start_cooldown), is await manager.save_state(state) called? on_actions and on_chatbot_event auto-save — but external FSM mutations require an explicit save_state call:
    state.to_proactive(session_id=state.session_id, ...)
    await manager.save_state(state)   # persist PROACTIVE FSM state to Redis
    

5. Persistence (production)

  • On first session_id arrival, get_or_create creates and immediately persists SessionState to Redis. This is the ground truth — everything else builds on it.
  • on_actions does NOT save state — it only reads state.conversation_id for routing and passes actions to the writer. SessionState is not mutated by action delivery.
  • on_chatbot_event always saves after linking — conversation_linked and conversation_id are the mutations that need persistence there.
  • For any FSM mutation that happens outside on_chatbot_event, call await manager.save_state(state). This covers: to_proactive, to_reactive, tick, start_cooldown, record_user_interaction, and any metadata update.
  • Is InMemorySessionStateStore replaced with RedisSessionStateStore in production?
    # pip install "autoplay-sdk[redis]"
    manager = AutoplayChatbotManager(
        writer=MyWriter(),
        session_store=RedisSessionStateStore(redis_url=os.environ["REDIS_URL"]),
    )
    
  • Is REDIS_URL set in the production environment?
  • Is the TTL appropriate? Default is 24 h (ttl_s=86400). Sessions longer than the TTL will lose state.

Smoke test — verify the full hop before going to production

# Run from your project root after setting CONNECTOR_URL and API_TOKEN:
python -m autoplay_sdk.smoke_test \
    --url "$CONNECTOR_URL" \
    --token "$API_TOKEN"
# Expected output:
#   ✓ connection  — stream connected
#   ✓ heartbeat   — first ActionsPayload received within 10 s
#   ✓ session_id  — payload.session_id is non-empty
If session_id is empty at this stage, PostHog identify() has not fired yet — see activity-posthog Step 2.

Reference