Skip to main content
The SDK provides two buffer implementations depending on your deployment:
EventBufferRedisEventBuffer
StorageIn-memory (deque)Redis ZSET
Use caseDevelopment, single processProduction, multiple pods
PersistenceLost on restartSurvives restarts
Multi-processNoYes — shared across pods
ClientConnectorClient or AsyncConnectorClientAsyncConnectorClient only

EventBuffer — in-memory default

Collects events in memory so you can read them at any time instead of processing them immediately in a callback.

Setup

from autoplay_sdk import ConnectorClient, EventBuffer

buffer = EventBuffer(max_size=1000)

client = ConnectorClient(url=URL, token=TOKEN)
client.on_actions(buffer.add).on_summary(buffer.add)
client.run_in_background()

Constructor

EventBuffer(max_size=1000, on_drop=None)
max_size
int
default:"1000"
Maximum events to keep in memory. When full, the oldest event is dropped and on_drop is called. Set to 0 for unlimited (not recommended in production — use RedisEventBuffer instead).
on_drop
Callable[[AnyPayload], None]
default:"None"
Called with the dropped payload when the buffer is full. Use for metrics or alerting. If None, drops are logged as warnings only.

Drop handling example

dropped = []

buffer = EventBuffer(
    max_size=500,
    on_drop=lambda p: dropped.append(p),
)

Reading events

drain() — get all events and clear the buffer

events = buffer.drain()

for payload in events:
    text = payload.to_text()
    # embed, store, forward...

peek(n) — inspect without clearing

latest = buffer.peek(n=5)   # last 5 events, buffer unchanged
all    = buffer.peek()      # all events, buffer unchanged

size — how many events are waiting

print(buffer.size)      # int
print(buffer.is_empty)  # bool

Drain by type

Process actions and summaries at different cadences:
# Only drain summaries this time — leave actions for later
summaries = buffer.drain_by_type(actions=False, summaries=True)

# Only drain actions
actions = buffer.drain_by_type(actions=True, summaries=False)

Periodic indexing example

import time
import threading
from autoplay_sdk import ConnectorClient, EventBuffer

buffer = EventBuffer()
client = ConnectorClient(url=URL, token=TOKEN)
client.on_actions(buffer.add)
client.run_in_background()

def indexer():
    while True:
        time.sleep(30)
        for payload in buffer.drain():
            vector_store.upsert(id=payload.session_id, vector=embed(payload.to_text()))

threading.Thread(target=indexer, daemon=True).start()

RedisEventBuffer — production, multi-pod

Uses a Redis sorted set (ZSET) per session with a sliding timestamp window — the same architecture as the event connector’s internal buffer. Designed for high-throughput deployments where multiple service instances share state.
from autoplay_sdk import AsyncConnectorClient
from autoplay_sdk.buffer import RedisEventBuffer

buffer = RedisEventBuffer(
    redis_url="redis://localhost:6379/0",
    key_prefix="my_service",
    on_drop=lambda p: metrics.increment("buffer_drop"),
)

client = AsyncConnectorClient(url=URL, token=TOKEN)
client.on_actions(buffer.add).on_summary(buffer.add)
task = client.run_in_background()

# later, anywhere in your app:
events = await buffer.drain()   # typed list, chronological order

Constructor

RedisEventBuffer(
    redis_url,
    key_prefix="default",
    window_seconds=120.0,
    max_concurrent=10,
    on_drop=None,
)
redis_url
string
required
Redis connection URL, e.g. "redis://localhost:6379/0".
key_prefix
string
default:"\"default\""
Namespace prefix for all Redis keys. Use a unique value per service or environment to avoid key collisions.
window_seconds
float
default:"120.0"
Sliding window size in seconds. Events older than this are automatically evicted on the next write. Default matches the connector’s 2-minute window.
max_concurrent
int
default:"10"
Maximum concurrent Redis write operations. When exceeded, the payload is dropped and on_drop is called instead of queuing unboundedly.
on_drop
Callable[[AnyPayload], None]
default:"None"
Called when a payload is dropped due to backpressure or Redis being unavailable. Use for metrics or alerting.

Graceful degradation

If Redis is unavailable at startup or during operation, add() calls on_drop and returns — your application keeps running without buffering. drain() returns an empty list. No exceptions are raised.

API reference

MethodReturnsDescription
await .add(payload)NoneAdd a payload (async)
await .drain()listReturn all payloads across all sessions, chronological order, and clear
await .size()intTotal buffered payloads across all sessions

BufferBackend protocol

Implement BufferBackend to plug in any durable store (Kafka, Postgres, etc.) as a custom async backend:
from autoplay_sdk.buffer import BufferBackend
from autoplay_sdk.models import AnyPayload

class MyBackend:
    async def add(self, payload: AnyPayload) -> None: ...
    async def drain(self) -> list[AnyPayload]: ...
    async def size(self) -> int: ...

# MyBackend now satisfies the BufferBackend protocol
assert isinstance(MyBackend(), BufferBackend)

EventBuffer API reference

Method / PropertyReturnsDescription
.add(payload)NoneAdd an event — wire to on_actions / on_summary
.drain()listReturn all events and clear the buffer
.peek(n=None)listRead last N events without clearing
.drain_by_type(actions, summaries)listDrain only specific event types
.clear()NoneDiscard all events
.sizeintNumber of buffered events
.is_emptyboolTrue when no events are buffered
.max_sizeintMaximum buffer capacity