Skip to main content
RagPipeline (sync) and AsyncRagPipeline (async) connect the Autoplay event stream to your vector store with minimal code. You provide two functions — embed and upsert — and the pipeline handles the rest.

Sync pipeline

import openai
from autoplay_sdk import ConnectorClient
from autoplay_sdk.rag import RagPipeline

openai_client = openai.OpenAI()

pipeline = RagPipeline(
    embed=lambda text: openai_client.embeddings.create(
        input=text, model="text-embedding-3-small"
    ).data[0].embedding,
    upsert=lambda id, vector, meta: pinecone_index.upsert([(id, vector, meta)]),
)

ConnectorClient(url=URL, token=TOKEN) \
    .on_actions(pipeline.on_actions) \
    .on_summary(pipeline.on_summary) \
    .run()

Async pipeline

import openai
from autoplay_sdk import AsyncConnectorClient
from autoplay_sdk.rag import AsyncRagPipeline

openai_client = openai.AsyncOpenAI()

async def embed(text: str) -> list[float]:
    r = await openai_client.embeddings.create(input=text, model="text-embedding-3-small")
    return r.data[0].embedding

async def upsert(id: str, vector: list[float], meta: dict) -> None:
    await pinecone_index.upsert([(id, vector, meta)])

pipeline = AsyncRagPipeline(embed=embed, upsert=upsert)

async with AsyncConnectorClient(url=URL, token=TOKEN) as client:
    client.on_actions(pipeline.on_actions)
    client.on_summary(pipeline.on_summary)
    await client.run()

With a SessionSummarizer

Attach a SessionSummarizer to automatically condense actions before embedding. This keeps your vector store entries compact and your context window small.
from autoplay_sdk.rag import RagPipeline
from autoplay_sdk.summarizer import SessionSummarizer

summarizer = SessionSummarizer(llm=my_llm_fn, threshold=10)

pipeline = RagPipeline(
    embed=embed_fn,
    upsert=upsert_fn,
    summarizer=summarizer,  # actions go through summarizer first
)

client.on_actions(pipeline.on_actions).on_summary(pipeline.on_summary).run()
When the summarizer fires (every 10 actions), the LLM-generated summary is embedded and upserted — not the raw action batch.

Compatible vector stores

The upsert callable works with any vector store. Examples:
from pinecone import Pinecone

pc = Pinecone(api_key="...")
index = pc.Index("my-index")

upsert = lambda id, vector, meta: index.upsert(vectors=[{"id": id, "values": vector, "metadata": meta}])

What gets upserted

Event typeID usedText embedded
ActionsPayloadsession_idpayload.to_text() — numbered action list
SummaryPayloadsession_idpayload.to_text() — prose summary
Client summary (via SessionSummarizer)session_idLLM-generated summary text
Each event type upserts with the same session_id as the key — so your vector store always has one up-to-date entry per session.

Constructor

RagPipeline(embed, upsert, summarizer=None)

embed
Callable[[str], list[float]]
required
Any embedding function. Receives payload.to_text() and must return a vector.
upsert
Callable[[str, list[float], dict], None]
required
Writes the embedding to your vector store. Called with (session_id, vector, metadata).
summarizer
SessionSummarizer | None
default:"None"
Optional SessionSummarizer. When set, actions are summarised before embedding.

AsyncRagPipeline(embed, upsert, summarizer=None)

Same parameters but embed and upsert are async callables.

Callbacks

MethodDescription
.on_actions(payload)Wire to client.on_actions(...)
.on_summary(payload)Wire to client.on_summary(...)