Use AsyncConnectorClient when your RAG pipeline uses asyncio β LangChain, LlamaIndex,
FastAPI, or any framework where embedding and vector store calls are already async.
import asyncioimport openaifrom autoplay_sdk import AsyncConnectorClient, ActionsPayload, SummaryPayloadopenai_client = openai.AsyncOpenAI()async def on_actions(payload: ActionsPayload): text = payload.to_text() # embedding-ready string β no formatting needed response = await openai_client.embeddings.create( input=text, model="text-embedding-3-small" ) embedding = response.data[0].embedding # upsert into Pinecone, Weaviate, Chroma, pgvector, etc. await your_vector_store.upsert( id=payload.session_id or "unknown", vector=embedding, metadata={ "session_id": payload.session_id, "email": payload.email, "count": payload.count, }, )async def on_summary(payload: SummaryPayload): # replace raw action embeddings with a compact prose summary text = payload.to_text() response = await openai_client.embeddings.create( input=text, model="text-embedding-3-small" ) await your_vector_store.upsert( id=payload.session_id or "unknown", vector=response.data[0].embedding, metadata={"session_id": payload.session_id, "replaces": payload.replaces}, )async def main(): async with AsyncConnectorClient(url=STREAM_URL, token=API_TOKEN) as client: client.on_actions(on_actions).on_summary(on_summary) await client.run()asyncio.run(main())
If your pipeline is synchronous, use ConnectorClient. The SSE reader and your callback
run on separate threads β blocking calls (embedding APIs, database writes) are safe.
Upsert each ActionsPayload as it arrives β granular, session-level embeddings.
Good for detailed retrieval (βwhat did this user click last session?β).
Replace with summaries
When a SummaryPayload arrives (payload.replaces actions), replace the raw
embeddings with a single prose embedding. Keeps your context window compact.
You can do both at once β index actions immediately, then overwrite with the summary when it arrives:
async def on_actions(payload: ActionsPayload): await upsert(id=f"actions:{payload.session_id}", text=payload.to_text())async def on_summary(payload: SummaryPayload): # overwrite the actions embedding with the compact summary await upsert(id=f"actions:{payload.session_id}", text=payload.to_text())