Skip to main content
Use AsyncConnectorClient when your RAG pipeline uses asyncio — LangChain, LlamaIndex, FastAPI, or any framework where embedding and vector store calls are already async.
import asyncio
import openai
from autoplay_sdk import AsyncConnectorClient, ActionsPayload, SummaryPayload

openai_client = openai.AsyncOpenAI()

async def on_actions(payload: ActionsPayload):
    text = payload.to_text()  # embedding-ready string — no formatting needed
    response = await openai_client.embeddings.create(
        input=text, model="text-embedding-3-small"
    )
    embedding = response.data[0].embedding

    # upsert into Pinecone, Weaviate, Chroma, pgvector, etc.
    await your_vector_store.upsert(
        id=payload.session_id or "unknown",
        vector=embedding,
        metadata={
            "session_id": payload.session_id,
            "email":      payload.email,
            "count":      payload.count,
        },
    )

async def on_summary(payload: SummaryPayload):
    # replace raw action embeddings with a compact prose summary
    text = payload.to_text()
    response = await openai_client.embeddings.create(
        input=text, model="text-embedding-3-small"
    )
    await your_vector_store.upsert(
        id=payload.session_id or "unknown",
        vector=response.data[0].embedding,
        metadata={"session_id": payload.session_id, "replaces": payload.replaces},
    )

async def main():
    async with AsyncConnectorClient(url=STREAM_URL, token=API_TOKEN) as client:
        client.on_actions(on_actions).on_summary(on_summary)
        await client.run()

asyncio.run(main())

Background task inside FastAPI

from contextlib import asynccontextmanager
from fastapi import FastAPI
from autoplay_sdk import AsyncConnectorClient, ActionsPayload

client = AsyncConnectorClient(url=STREAM_URL, token=API_TOKEN)
client.on_actions(on_actions)

@asynccontextmanager
async def lifespan(app: FastAPI):
    task = client.run_in_background()
    yield
    client.stop()
    await task

app = FastAPI(lifespan=lifespan)

Sync pipeline

If your pipeline is synchronous, use ConnectorClient. The SSE reader and your callback run on separate threads — blocking calls (embedding APIs, database writes) are safe.
import openai
from autoplay_sdk import ConnectorClient, ActionsPayload

openai_client = openai.OpenAI()

def on_actions(payload: ActionsPayload):
    text = payload.to_text()
    embedding = openai_client.embeddings.create(
        input=text, model="text-embedding-3-small"
    ).data[0].embedding
    your_vector_store.upsert(id=payload.session_id or "unknown", vector=embedding)

ConnectorClient(url=STREAM_URL, token=API_TOKEN) \
    .on_actions(on_actions) \
    .run()

Strategy: actions vs. summaries

Index actions

Upsert each ActionsPayload as it arrives — granular, session-level embeddings. Good for detailed retrieval (“what did this user click last session?”).

Replace with summaries

When a SummaryPayload arrives (payload.replaces actions), replace the raw embeddings with a single prose embedding. Keeps your context window compact.
You can do both at once — index actions immediately, then overwrite with the summary when it arrives:
async def on_actions(payload: ActionsPayload):
    await upsert(id=f"actions:{payload.session_id}", text=payload.to_text())

async def on_summary(payload: SummaryPayload):
    # overwrite the actions embedding with the compact summary
    await upsert(id=f"actions:{payload.session_id}", text=payload.to_text())