EventBuffer | RedisEventBuffer | |
|---|---|---|
| Storage | In-memory (deque) | Redis ZSET |
| Use case | Development, single process | Production, multiple pods |
| Persistence | Lost on restart | Survives restarts |
| Multi-process | No | Yes — shared across pods |
| Client | ConnectorClient or AsyncConnectorClient | AsyncConnectorClient only |
EventBuffer — in-memory default
Collects events in memory so you can read them at any time instead of processing them immediately in a callback.Setup
Constructor
Maximum events to keep in memory. When full, the oldest event is dropped and
on_drop is called. Set to 0 for unlimited (not recommended in production —
use RedisEventBuffer instead).Called with the dropped payload when the buffer is full. Use for metrics or
alerting. If
None, drops are logged as warnings only.Drop handling example
Reading events
drain() — get all events and clear the buffer
peek(n) — inspect without clearing
size — how many events are waiting
Drain by type
Process actions and summaries at different cadences:Periodic indexing example
RedisEventBuffer — production, multi-pod
Uses a Redis sorted set (ZSET) per session with a sliding timestamp window — the same architecture as the event connector’s internal buffer. Designed for high-throughput deployments where multiple service instances share state.Constructor
Redis connection URL, e.g.
"redis://localhost:6379/0".Namespace prefix for all Redis keys. Use a unique value per service or
environment to avoid key collisions.
Sliding window size in seconds. Events older than this are automatically
evicted on the next write. Default matches the connector’s 2-minute window.
Maximum concurrent Redis write operations. When exceeded, the payload is
dropped and
on_drop is called instead of queuing unboundedly.Called when a payload is dropped due to backpressure or Redis being
unavailable. Use for metrics or alerting.
Graceful degradation
If Redis is unavailable at startup or during operation,add() calls on_drop
and returns — your application keeps running without buffering. drain() returns
an empty list. No exceptions are raised.
API reference
| Method | Returns | Description |
|---|---|---|
await .add(payload) | None | Add a payload (async) |
await .drain() | list | Return all payloads across all sessions, chronological order, and clear |
await .size() | int | Total buffered payloads across all sessions |
BufferBackend protocol
ImplementBufferBackend to plug in any durable store (Kafka, Postgres, etc.)
as a custom async backend:
EventBuffer API reference
| Method / Property | Returns | Description |
|---|---|---|
.add(payload) | None | Add an event — wire to on_actions / on_summary |
.drain() | list | Return all events and clear the buffer |
.peek(n=None) | list | Read last N events without clearing |
.drain_by_type(actions, summaries) | list | Drain only specific event types |
.clear() | None | Discard all events |
.size | int | Number of buffered events |
.is_empty | bool | True when no events are buffered |
.max_size | int | Maximum buffer capacity |