Skip to main content

Documentation Index

Fetch the complete documentation index at: https://developers.autoplay.ai/llms.txt

Use this file to discover all available pages before exploring further.

RagPipeline (sync) and AsyncRagPipeline (async) connect the Autoplay event stream to your vector store with minimal code. You provide two functions โ€” embed and upsert โ€” and the pipeline handles the rest.

Sync pipeline

import openai
from autoplay_sdk import ConnectorClient
from autoplay_sdk.rag import RagPipeline

openai_client = openai.OpenAI()

pipeline = RagPipeline(
    embed=lambda text: openai_client.embeddings.create(
        input=text, model="text-embedding-3-small"
    ).data[0].embedding,
    upsert=lambda id, vector, meta: pinecone_index.upsert([(id, vector, meta)]),
)

ConnectorClient(url=URL, token=TOKEN) \
    .on_actions(pipeline.on_actions) \
    .on_summary(pipeline.on_summary) \
    .run()

Async pipeline

import openai
from autoplay_sdk import AsyncConnectorClient
from autoplay_sdk.rag import AsyncRagPipeline

openai_client = openai.AsyncOpenAI()

async def embed(text: str) -> list[float]:
    r = await openai_client.embeddings.create(input=text, model="text-embedding-3-small")
    return r.data[0].embedding

async def upsert(id: str, vector: list[float], meta: dict) -> None:
    await pinecone_index.upsert([(id, vector, meta)])

pipeline = AsyncRagPipeline(embed=embed, upsert=upsert)

async with AsyncConnectorClient(url=URL, token=TOKEN) as client:
    client.on_actions(pipeline.on_actions)
    client.on_summary(pipeline.on_summary)
    await client.run()

With a SessionSummarizer

Attach a SessionSummarizer to automatically condense actions before embedding. This keeps your vector store entries compact and your context window small.
from autoplay_sdk.rag import RagPipeline
from autoplay_sdk.summarizer import SessionSummarizer

summarizer = SessionSummarizer(llm=my_llm_fn, threshold=10)

pipeline = RagPipeline(
    embed=embed_fn,
    upsert=upsert_fn,
    summarizer=summarizer,  # actions go through summarizer first
)

client.on_actions(pipeline.on_actions).on_summary(pipeline.on_summary).run()
When the summarizer fires (every 10 actions), the LLM-generated summary is embedded and upserted โ€” not the raw action batch.

Compatible vector stores

The upsert callable works with any vector store. Examples:
from pinecone import Pinecone

pc = Pinecone(api_key="...")
index = pc.Index("my-index")

upsert = lambda id, vector, meta: index.upsert(vectors=[{"id": id, "values": vector, "metadata": meta}])

What gets upserted

Event typeID usedText embedded
ActionsPayloadsession_idpayload.to_text() โ€” numbered action list
SummaryPayloadsession_idpayload.to_text() โ€” prose summary
Client summary (via SessionSummarizer)session_idLLM-generated summary text
Each event type upserts with the same session_id as the key โ€” so your vector store always has one up-to-date entry per session.

Constructor

RagPipeline(embed, upsert, summarizer=None)

embed
Callable[[str], list[float]]
required
Any embedding function. Receives payload.to_text() and must return a vector.
upsert
Callable[[str, list[float], dict], None]
required
Writes the embedding to your vector store. Called with (session_id, vector, metadata).
summarizer
SessionSummarizer | None
default:"None"
Optional SessionSummarizer. When set, actions are summarised before embedding.

AsyncRagPipeline(embed, upsert, summarizer=None)

Same parameters but embed and upsert are async callables.

Callbacks

MethodDescription
.on_actions(payload)Wire to client.on_actions(...)
.on_summary(payload)Wire to client.on_summary(...)