Skip to main content
AsyncConnectorClient is the async-native client. Use it when your pipeline uses asyncio — LangChain, LlamaIndex, FastAPI, or any framework where embedding and vector store calls are async. Callbacks are async def coroutines, so you can await any async operation directly inside them.

Constructor

from autoplay_sdk import AsyncConnectorClient

client = AsyncConnectorClient(
    url="https://your-connector.onrender.com/stream/YOUR_PRODUCT_ID",
    token="uk_live_...",
    session_concurrency=4,
)
url
string
required
Full URL to GET /stream/{product_id} on the connector.
token
string
default:"\"\""
Unkey API key (uk_live_...). Leave empty to connect without authentication.
session_concurrency
int
default:"4"
Maximum number of concurrent in-flight callback tasks per session. Each session’s callbacks run as independent asyncio.Task instances — a slow LLM or vector store call for one session never delays delivery for another. Increase this if a single session can generate bursts of events faster than your callback can process them.

Callbacks

All registration methods return self for chaining.

.on_actions(fn)

from autoplay_sdk import ActionsPayload

async def handle_actions(payload: ActionsPayload):
    text = payload.to_text()
    embedding = await embed_api.create(input=text)
    await vector_store.upsert(id=payload.session_id, vector=embedding)

client.on_actions(handle_actions)
fn
async Callable[[ActionsPayload], None]
required
Async coroutine called every time a batch of UI actions arrives. await any async I/O directly — it won’t block other coroutines.

.on_summary(fn)

from autoplay_sdk import SummaryPayload

async def handle_summary(payload: SummaryPayload):
    text = payload.to_text()
    await vector_store.upsert(id=payload.session_id, vector=await embed(text))

client.on_summary(handle_summary)
fn
async Callable[[SummaryPayload], None]
required
Async coroutine called every time a session summary arrives.

Per-session task isolation

Each incoming payload is dispatched as an independent asyncio.Task keyed by session_id. This means:
  • A slow embedding call for session A never delays delivery for session B.
  • Each session has its own asyncio.Semaphore capped at session_concurrency concurrent tasks.
  • The SSE reader loop is never blocked by your callback logic.
This mirrors the event connector’s own per-session asyncio.Task architecture.

Lifecycle

await .run()

Connect to the SSE stream and process events. Reconnects with exponential backoff (1 s → 30 s).
async def main():
    async with AsyncConnectorClient(url=URL, token=TOKEN) as client:
        client.on_actions(handle_actions)
        await client.run()

asyncio.run(main())
HTTP 401, 403, and 404 are not retried — they raise immediately. Check your url and token.

.run_in_background()

Schedule run() as an asyncio.Task and return immediately.
task = client.run_in_background()

await do_other_async_work()

client.stop()
await task
Returns an asyncio.Task.

.stop()

Signal the client to stop after the current event.

Async context manager

async with AsyncConnectorClient(url=URL, token=TOKEN) as client:
    client.on_actions(handle_actions).on_summary(handle_summary)
    await client.run()
# stop() called automatically on exit

FastAPI integration

Register the client as a background task using FastAPI’s lifespan:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from autoplay_sdk import AsyncConnectorClient, ActionsPayload

async def handle_actions(payload: ActionsPayload):
    await embed_and_store(payload.to_text(), session_id=payload.session_id)

client = AsyncConnectorClient(url=URL, token=TOKEN).on_actions(handle_actions)

@asynccontextmanager
async def lifespan(app: FastAPI):
    task = client.run_in_background()
    yield
    client.stop()
    await task

app = FastAPI(lifespan=lifespan)

Chained setup

await (
    AsyncConnectorClient(url=URL, token=TOKEN)
    .on_actions(handle_actions)
    .on_summary(handle_summary)
    .run()
)