Your Dify chatbot answers questions using that user’s live session context — knowing exactly where they are in the product and offering to guide them step by step.With real-time context, the chatbot was able to reply: “Looks like you’ve been navigating around the consultant page quite a bit. To find your alerts and requests, head over to the Notifications page” — meeting the user where they are based on their recent activity and tailoring guidance accordingly.
The server consumes Autoplay’s SSE stream using AsyncConnectorClient, buffers actions with AsyncAgentContextWriter, and exposes an HTTP endpoint that Dify’s retriever polls.Dify’s External Knowledge API calls your server via POST /retrieval — it appends /retrieval to whatever base URL you register. The server below implements exactly that contract, with each user’s events stored in a separate per-session buffer and the session identifier read from the knowledge_id field Dify sends in its request body.
Create server.py. The server maintains a session-keyed in-memory store and serves the right session’s records at POST /retrieval. Because every actions payload from Autoplay already carries session_id and email on the batch envelope (and on each action object), the server can key its store directly from the incoming event — no client-side plumbing needed.When you register the knowledge base in Dify you enter your chosen identifier (session_id or email) as the External Knowledge ID. Dify sends that value as knowledge_id in the body of every POST /retrieval call — which is how the server opens the right per-user bucket.
Full server.py — expand to copy
# server.py — Autoplay → Dify real-time knowledge server (session-scoped)import asyncio, osfrom collections import defaultdictfrom datetime import datetime, timezonefrom typing import Optionalimport openaifrom fastapi import FastAPIfrom fastapi.responses import JSONResponsefrom pydantic import BaseModelfrom autoplay_sdk import AsyncConnectorClient, AsyncSessionSummarizerfrom autoplay_sdk.agent_context import AsyncAgentContextWriter# ── Config ───────────────────────────────────────────────────────────CONNECTOR_URL = os.environ["AUTOPLAY_STREAM_URL"] # e.g. https://…/stream/<product_id>API_TOKEN = os.environ["AUTOPLAY_API_TOKEN"]MAX_CHUNKS = 50 # per session — keeps memory boundedapp = FastAPI()async_openai = openai.AsyncOpenAI()# ── Request model ─────────────────────────────────────────────────────# Dify always calls POST /retrieval with this JSON body.# knowledge_id is the External Knowledge ID you set in the Dify UI —# it contains whichever session identifier (session_id or email) your# frontend passed via the chat-messages `inputs` field.class RetrievalSetting(BaseModel): top_k: int = 5 score_threshold: float = 0.5class RetrievalRequest(BaseModel): knowledge_id: str query: str retrieval_setting: RetrievalSetting# ── In-memory stores (session-scoped) ────────────────────────────────chunks_by_session: dict[str, list[dict]] = defaultdict(list)email_to_session: dict[str, str] = {}def _store(session_id: str, email: Optional[str], text: str) -> None: entry = {"text": text, "ts": datetime.now(timezone.utc).isoformat()} buf = chunks_by_session[session_id] buf.append(entry) if len(buf) > MAX_CHUNKS: buf.pop(0) if email: email_to_session[email] = session_id# ── Callbacks ────────────────────────────────────────────────────────async def write_actions(session_id: str, text: str) -> None: _store(session_id, email=None, text=text)async def on_raw_payload(payload: dict) -> None: """Capture email → session_id mapping from every incoming event.""" sid = payload.get("session_id") email = payload.get("email") if sid and email: email_to_session[email] = sidasync def overwrite_with_summary(session_id: str, summary: str) -> None: chunks_by_session[session_id] = [{ "text": summary, "ts": datetime.now(timezone.utc).isoformat(), }]# ── LLM summarizer ───────────────────────────────────────────────────async def llm(prompt: str) -> str: r = await async_openai.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], temperature=0.3, max_tokens=256, ) return r.choices[0].message.contentsummarizer = AsyncSessionSummarizer(llm=llm, threshold=20)agent_writer = AsyncAgentContextWriter( summarizer=summarizer, write_actions=write_actions, overwrite_with_summary=overwrite_with_summary, debounce_ms=0,)# ── POST /retrieval ───────────────────────────────────────────────────# Dify appends /retrieval to the base URL you registered and sends a# POST with knowledge_id, query, and retrieval_setting in the body.# It never sends GET requests or query parameters to this endpoint.@app.post("/retrieval")async def retrieval(body: RetrievalRequest): """ knowledge_id contains the session identifier your frontend passed via Dify's `inputs` field when opening the conversation. Accepts either: knowledge_id = "ps_abc123" — direct session_id lookup knowledge_id = "user@example.com" — resolved via email index Response contract Dify expects: { "records": [{"content": "…", "score": 1.0, "title": "…", "metadata": {}}, …] } Note: metadata must be {} (empty object) — never null. """ identifier = body.knowledge_id # Try direct session_id lookup first source = chunks_by_session.get(identifier) # Fall back to email → session_id index if source is None: sid = email_to_session.get(identifier) source = chunks_by_session.get(sid) if sid else None if not source: # No events received yet — return empty gracefully return JSONResponse({"records": []}) top_k = body.retrieval_setting.top_k threshold = body.retrieval_setting.score_threshold records = [ { "content": c["text"], "score": 1.0, "title": f"events @ {c['ts']}", "metadata": {}, # must be {} — never null } for c in reversed(source) # newest first if 1.0 >= threshold ][:top_k] return JSONResponse({"records": records})# ── Background stream task ────────────────────────────────────────────@app.on_event("startup")async def start_stream(): asyncio.create_task(_run_stream())async def _run_stream(): async with AsyncConnectorClient(url=CONNECTOR_URL, token=API_TOKEN) as client: client.on_actions(agent_writer.add) client.on_raw(on_raw_payload) await client.run()
session_id and email are delivered to your server automatically — they’re top-level fields on every actions payload (see Payload schema). You don’t need to instrument your frontend to forward them; the Autoplay connector already includes them.
Preferred. Present on every payload as payload.session_id. Maps 1-to-1 to a browser session — the right granularity for “where is this user right now.” No race condition.
email
Use when your frontend knows the logged-in user’s email but not their PostHog session ID. The server resolves email → session_id from the index built as events arrive. Make sure at least one event has arrived before the first retrieval, or the index will be empty.
Dify always calls POST /retrieval with a JSON body. It never sends a GET request and never forwards query parameters to your endpoint. The session_id or email value must reach your server via the knowledge_id field in the POST body — not via URL query strings.
Run locally with uvicorn server:app --port 8000 and expose it with ngrok http 8000 for testing. For production, deploy anywhere that can serve a public HTTPS endpoint — Render, Railway, Fly.io, a VPS, or your own infrastructure.Use the following curl commands to verify your server. These mirror exactly what Dify sends on every retrieval:
# Verify with a session_id (preferred)curl -X POST "https://your-server.example.com/retrieval" \ -H "Content-Type: application/json" \ -d '{"knowledge_id":"ps_abc123","query":"what is the user doing","retrieval_setting":{"top_k":5,"score_threshold":0.5}}'# → {"records": [{"content": "…", "score": 1.0, "title": "…", "metadata": {}}]}# Verify with an emailcurl -X POST "https://your-server.example.com/retrieval" \ -H "Content-Type: application/json" \ -d '{"knowledge_id":"user@example.com","query":"what is the user doing","retrieval_setting":{"top_k":5,"score_threshold":0.5}}'# → {"records": [...]}# Unknown identifier returns empty records (correct — no events yet)curl -X POST "https://your-server.example.com/retrieval" \ -H "Content-Type: application/json" \ -d '{"knowledge_id":"unknown","query":"test","retrieval_setting":{"top_k":5,"score_threshold":0.5}}'# → {"records": []}
Don’t want to self-host? Join the Autoplay Slack workspace and post in #just-integrated — we can host the event server for you and hand you a ready-to-use URL to drop straight into Dify.
This is the most important concept in this guide. Read it before writing any code.
The real-time knowledge base only works correctly if the events Dify retrieves belong to this user’s current session — not a pool of all users’ events mixed together. Here’s what has to happen:
Buffer events per session. Every chunk written to the knowledge base must be keyed by a stable identifier. The Autoplay payload makes this easy: every actions event carries session_id and email at the top level (see Payload schema). Your server reads whichever field you want to key on directly from the incoming event — no extra client-side instrumentation needed.
Dify must identify the same session on every retrieval. Dify sends the External Knowledge ID you configured as knowledge_id in every POST /retrieval call. Whatever value you put in that field must match the key your server used when storing events. If the keys don’t line up, retrieval returns the wrong bucket or nothing at all.
Without this link, context is empty or generic. Dify can’t open the right bucket if knowledge_id doesn’t match a stored key.
The practical rule: pick one key (session_id or email), store with it, set it as the External Knowledge ID in Dify. The session boundary is the only distinction between users that matters here.
Connecting Dify to your event server is a two-phase process:First register the API endpoint, then create the knowledge base that points to it.Dify sends a POST /retrieval request to your server on every retrieval step — rather than indexing a static document.
Go to Knowledge. Click External Knowledge API in the top right corner.
Field
Value
Name
A label for this connection — e.g. autoplay-realtime
API Endpoint
Your server’s base URL only — e.g. https://your-server.example.com. Dify appends /retrieval automatically. Do not include /retrieval here.
API Key
Optional. If you added Bearer token auth to /retrieval, paste the key here. Dify sends it as Authorization: Bearer {key} on every request. Otherwise leave blank.
Dify validates the connection by sending a test POST /retrieval request — confirm the response contains "records".
How Dify calls your server: every retrieval sends POST {endpoint}/retrieval with a JSON body containing knowledge_id, query, and retrieval_setting (top_k, score_threshold). Your server must respond with {"records": [...]}. See the External Knowledge API spec for the full contract.
The External Knowledge ID is the value Dify sends as knowledge_id in every POST /retrieval call. It must match whichever key your server uses when storing events.Because the identifier is per-user and per-session, it can’t be hardcoded in the Dify UI — it needs to be injected dynamically at chat time. Here is how the full wiring works:
Your frontend passes the identifier in the inputs object of the chat-messages API call.
You define a matching input variable in your Dify app (e.g. session_id).
In the Context panel (Chatbot) or Knowledge Retrieval node (Chatflow), you wire that variable to the External Knowledge ID field — so Dify substitutes the real value into knowledge_id on every POST /retrieval call.
Option A — session_id (preferred)
Option B — email
session_id is the PostHog session ID. It’s present on every Autoplay payload from the very first event, with no race conditions.Step 1 — Pass it from your frontend:
const response = await fetch("https://api.dify.ai/v1/chat-messages", { method: "POST", headers: { "Authorization": `Bearer ${DIFY_APP_KEY}`, "Content-Type": "application/json", }, body: JSON.stringify({ inputs: { session_id: posthog.get_session_id(), // stable for this browser session }, query: userMessage, conversation_id: existingConversationId, // omit or null to start new user: currentUserId, }),});
Step 2 — Define the variable in Dify:In your Dify app, go to Studio → your app → Orchestrate. In the Variables section at the top of the canvas, add a variable named session_id (type: String).Step 3 — Wire it to the knowledge base:
Chatbot: In the Context panel, select Real-Time Events. In the retrieval settings for that knowledge base, set External Knowledge ID to {{session_id}}.
Chatflow: In the Knowledge Retrieval node, select Real-Time Events and set the External Knowledge ID field to the session_id variable.
Dify will substitute the real value from inputs into knowledge_id on every POST /retrieval call.
If your frontend knows the logged-in user’s email but not their PostHog session ID, use email instead. The server builds an email → session_id index automatically from incoming payloads — no extra backend work needed.Step 1 — Pass it from your frontend:
Step 2 — Define the variable in Dify:In Studio → your app → Orchestrate → Variables, add a variable named email (type: String).Step 3 — Wire it to the knowledge base:
Chatbot: In the Context panel retrieval settings for Real-Time Events, set External Knowledge ID to {{email}}.
Chatflow: In the Knowledge Retrieval node, set the External Knowledge ID field to the email variable.
Make sure at least one event has arrived from that user before the first retrieval — otherwise the email → session_id index is empty and the endpoint returns no records. session_id doesn’t have this problem because the store key is written on the very first event.
Dify re-queries your endpoint on every chat turn — there is no sync schedule to configure and events are always fresh.
In Studio → your app → Orchestrate, attach the Real-Time Events knowledge base to your app and update the system prompt so the model knows how to use the retrieved context.
Replace (or append to) the existing system prompt with something like the example below. The key section is How to use the “Current User Activity” record — it tells the model exactly when and how to surface real-time context. The {{#context#}} placeholder is automatically injected by Dify with the retrieved chunks for this session.
Example system prompt — expand to copy
You are a friendly and helpful assistant for users of this product.Focus on helping people find their way in the UI, complete workflows, andunderstand features. Assume some users are seeing the product for the first time.## 💬 How to use the "Current User Activity" recordYou may receive a special record titled "Current User Activity" in theretrieved context. This shows what THIS user has been doing on theplatform in the last 2 minutes — which page they are on and what theyclicked. The activity is scoped to their session, so it reflects onlytheir actions, not anyone else's.{{#context#}}When this record is present:1. **Acknowledge their activity naturally** — for example: "I can see you're currently on the Projects page" or "It looks like you've been exploring the Dashboard."2. **Use it to give specific directions** — instead of generic instructions, reference where they are: "From the page you're on, click the blue 'Add Project' button at the top right."3. **Detect if they might be lost** — if their actions show them clicking around without a clear pattern, gently offer help: "It looks like you might be looking for something specific. Can I help you find it?"4. **Don't force it** — if the user's question has nothing to do with their current activity, just answer the question normally. Don't mention their activity unless it's helpful.## ❓ How to answer questions- **Be specific**: reference actual button names, tab labels, and menu items from the knowledge base.- **Use numbered steps**: when explaining how to do something, always use a numbered list.- **Keep it simple**: avoid technical jargon. Explain as if the user has never used the platform before.- **Be encouraging**: use phrases like "Great question!" or "That's easy to do" to make users feel comfortable.- **Offer next steps**: after answering, suggest what they might want to do next.- **Admit when you don't know**: if the knowledge base doesn't have the answer, say so honestly.## 🌐 LanguageRespond in the same language the user writes in.## ✅ Examples of good responsesUser is on the Dashboard, asks "How do I create a project?": "I can see you're currently on the Dashboard. To create a new project: 1. Click on 'My Projects' in the left sidebar 2. Click the 'Add Project' button at the top right 3. Choose the type, template, or options that match what you're creating 4. Fill in the required details and click 'Create' Would you like me to explain what each field means?"User is on the Invoice page, asks "Where are settings?": "The settings aren't on this page — you can find them by clicking on your profile icon in the top right corner, then selecting 'Settings' from the dropdown menu."User has no activity context, asks "What can I do here?": "Welcome! Here's what you can do: 1. Dashboard — see an overview of your work 2. My Projects — create and manage projects 3. Reports — view analytics or exports 4. Billing — manage invoices or account settings What would you like to explore first?"
Testing it: Open the chatbot’s Preview panel and trigger some PostHog events in your app. Ask a normal help question like “How do I create a project?” — the bot should respond with directions specific to the page you’re currently on, not a generic walkthrough.
📊 Step 4 — Keep context compact with background summarization
Every time a user asks a question, Dify calls POST /retrieval on your server to fetch that session’s recent activity and inject it into the prompt. But if a user has been active for a while, raw event chunks accumulate fast — a wall of unprocessed clicks will bloat the context window, drive up costs, and drown the useful signal in noise. The solution is to continuously compress each session’s history in the background, before a user ever asks anything, so Dify always retrieves a tight, meaningful summary rather than a raw event log.
Step 1’s server already wires this up — no extra module needed.
After threshold actions per session (default 20), the summarizer runs in the background.
It calls your llm to collapse that session’s chunks into one summary, then overwrite_with_summary replaces only that session’s stored chunks with the compact summary.
The next POST /retrieval for that knowledge_id returns the compact summary instead of raw clicks — for that session only. Other sessions are unaffected.
Delay after the last action before writing a batch — 3000 merges rapid-fire clicks
llm
Any async prompt → string callable; gpt-4o-mini is a typical low-cost choice
Adjust AsyncSessionSummarizer / AsyncAgentContextWriter in server.py:
# Summarize after every 10 actions instead of 20,# and wait 3 s after the last action before writing a chunk.summarizer = AsyncSessionSummarizer(llm=llm, threshold=10)agent_writer = AsyncAgentContextWriter( summarizer=summarizer, write_actions=write_actions, overwrite_with_summary=overwrite_with_summary, debounce_ms=3000, # bin rapid clicks into ~3 s windows)
The summarizer runs on your server, not inside Dify. It fires between events arriving and Dify querying — so by the time the user asks a question, the context is already compact and already session-scoped. Dify never sees the raw flood of clicks; it only ever retrieves the most recent meaningful summary for the right user.
Bot gives generic answers despite user being active
knowledge_id is missing or mismatched — Dify is querying the wrong bucket
Confirm the session_id or email variable is defined in your Dify app, passed in inputs, and wired to the External Knowledge ID in the Context / Knowledge Retrieval settings
Bot sees another user’s activity
session_id cached from a previous session
Call posthog.get_session_id() fresh per conversation start, not from a cached value
Records always empty (email mode)
email → session_id index not populated yet — first retrieval raced ahead of first event
Switch to session_id mode, or add a short delay between page load and chat open so at least one event arrives first
Records always empty (session_id mode)
session_id in knowledge_id doesn’t match the key written by write_actions
Log payload.session_id in on_raw_payload and compare to what Dify sends — they must be identical strings
Context grows stale mid-session
PostHog rotates session_id after 30 min idle
Listen for PostHog’s session change event and re-pass the new session_id in Dify’s inputs; or switch to email mode which is session-rotation-safe
Dify returns a connection error on Save
Wrong URL registered — Dify is calling a path that doesn’t exist
Register only the base URL (e.g. https://your-server.example.com). Do not append /retrieval — Dify adds it automatically
Pipeline errors or empty context despite records returning
metadata field is null in one or more records
Ensure every record includes "metadata": {} — a null value causes errors in Dify’s retrieval pipeline