The hybrid RAG pattern is everywhere in 2026. A retriever agent searches a vector database, writes results to shared state, and a planner agent reasons over them. The pattern works — until you ask a basic question: how does the planner know the retriever actually wrote those results?
It doesn't. In every major framework — LangGraph, CrewAI, AutoGen — shared state is a data structure, not a signed artifact. Any agent can write any value. There is no mechanism to verify authorship, detect tampering, or enforce who is allowed to write what. OWASP ranked this ASI06 — memory and context poisoning — in their 2026 Agentic Top 10. Research from Obsidian Security found that a single compromised agent poisoned 87% of downstream decisions within four hours.
This tutorial builds a two-agent RAG pipeline with cryptographic context attribution using Skytale's SharedContext. The retriever writes results that are cryptographically linked to its DID. The planner verifies authorship before reasoning. A write policy ensures only the retriever can write retrieval results. The whole setup takes about 10 minutes.
What you'll build
Two LangGraph agents communicating over an MLS-encrypted channel with attributed shared context:
- Retriever agent — searches a vector store, writes results to SharedContext with its DID attached to every entry
- Planner agent — reads results, verifies the sender DID before reasoning, rejects unsigned or unauthorized writes
Every context write is encrypted with MLS (RFC 9420) and tagged with the sender's cryptographic identity. The relay routes ciphertext — it never sees keys, values, or agent DIDs.
Step 1: Install and set up agents
pip install skytale-sdk[langgraph] langchain-openai chromadb from skytale_sdk import SkytaleChannelManager
from skytale_sdk.context import SharedContext
from skytale_sdk.context_types import ContextType, WritePolicy
# Create two agents with distinct identities
retriever_mgr = SkytaleChannelManager(
identity=b"retriever-agent",
api_key="sk_live_...",
)
planner_mgr = SkytaleChannelManager(
identity=b"planner-agent",
api_key="sk_live_...",
)
# Retriever creates the channel, planner joins
retriever_mgr.create("acme/research/context")
token = retriever_mgr.invite("acme/research/context")
planner_mgr.join_with_token("acme/research/context", token) Each agent gets a DID derived from its identity bytes. When an agent writes to SharedContext, that DID is cryptographically attached to every entry via the sender_did field.
Step 2: Set up SharedContext with write policies
# Retriever's context — it will write results here
retriever_ctx = SharedContext(
retriever_mgr, "acme/research/context",
)
# Planner's context — it reads and verifies
planner_ctx = SharedContext(
planner_mgr, "acme/research/context",
)
planner_ctx.start_receiving()
# Get the retriever's DID for the write policy
retriever_did = retriever_mgr._did_uri
# Lock down retrieval keys: only the retriever can write them
retriever_ctx.set_write_policy("retrieval_*", WritePolicy(
allowed_writers=[retriever_did],
max_value_size=8192,
write_rate_limit=30,
))
planner_ctx.set_write_policy("retrieval_*", WritePolicy(
allowed_writers=[retriever_did],
max_value_size=8192,
write_rate_limit=30,
)) Why set the policy on both sides? Write policies are enforced locally — both on set() (outbound) and _apply() (inbound). The retriever's policy prevents unauthorized local writes. The planner's policy rejects unauthorized remote writes. A compromised agent that bypasses its local check still gets rejected by every other agent in the channel.
Step 3: Retriever writes attributed results
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
# Your existing vector store
vectorstore = Chroma(embedding_function=OpenAIEmbeddings())
def retrieve_and_attribute(query: str) -> dict:
"""Search vector store, write attributed results to SharedContext."""
docs = vectorstore.similarity_search(query, k=5)
results = {
"query": query,
"documents": [
{"content": doc.page_content, "source": doc.metadata.get("source", "")}
for doc in docs
],
}
# Write to SharedContext — sender_did is attached automatically
retriever_ctx.set(
f"retrieval_{query[:32].replace(' ', '_')}",
results,
type=ContextType.ARTIFACT,
ttl_ms=300_000, # 5 minute TTL
)
return results When retriever_ctx.set() executes, three things happen:
- The write policy check runs — is this agent in
allowed_writers? Is the value under 8 KB? Is the rate under 30/min? - The entry is stamped with the retriever's DID (
sender_did) and a hybrid logical clock timestamp (hlc) - The entry is serialized into an MLS-encrypted STATE envelope and broadcast to the channel
Step 4: Planner reads with verified provenance
import time
# Wait for the retriever to populate context
retrieve_and_attribute("latest security vulnerabilities in agent frameworks")
time.sleep(2)
# Planner reads with full provenance
entries = planner_ctx.list(type_filter=ContextType.ARTIFACT)
for key in entries:
entry = planner_ctx.get_entry(key)
if entry is None:
continue
# Verify attribution before reasoning
if entry.sender_did != retriever_did:
print(f"REJECTED: {key} written by unknown agent {entry.sender_did}")
continue
print(f"Verified: {key}")
print(f" Author: {entry.sender_did}")
print(f" Clock: {entry.hlc.wall_ms}")
print(f" Docs: {len(entry.value['documents'])} results")
# Safe to reason over — provenance verified
planner_prompt = (
f"Based on these verified research results "
f"(retrieved by {entry.sender_did} at {entry.hlc.wall_ms}):\n"
f"{entry.value}"
) The planner uses get_entry() instead of get() to access the full ContextEntry — including sender_did, hlc, scope, and ttl_ms. This is the difference between reading a value and reading an attributed value.
What happens when it's attacked
Suppose a compromised agent joins the channel and tries to poison the retrieval results:
# Attacker joins the channel and tries to write fake results
attacker_ctx = SharedContext(attacker_mgr, "acme/research/context")
try:
attacker_ctx.set("retrieval_fake", {"documents": [{"content": "malicious data"}]})
except WritePolicyViolation as e:
print(e)
# "agent did:key:z6MkAttacker... is not allowed to write to key
# 'retrieval_fake' (allowed: ['did:key:z6MkRetriever...'])"
# Even if the attacker bypasses their local check (modified SDK),
# the planner's _apply() enforces the same policy and silently
# drops the unauthorized write. Three layers of defense:
| Layer | What it does | OWASP |
|---|---|---|
| Write policy | Only the retriever's DID can write retrieval_* keys | ASI06 |
| Sender attribution | Every entry is tagged with the writer's cryptographic DID | ASI06 |
| MLS encryption | The relay never sees context keys, values, or DIDs | ASI07 |
Late-joining agents
When a third agent joins the channel later — say, a summarizer — it needs the current context without replaying the full message history. SharedContext handles this with snapshot-on-join:
# A summarizer agent joins later
summarizer_mgr = SkytaleChannelManager(
identity=b"summarizer-agent",
api_key="sk_live_...",
)
token = retriever_mgr.invite("acme/research/context")
summarizer_mgr.join_with_token("acme/research/context", token)
summarizer_ctx = SharedContext(
summarizer_mgr, "acme/research/context",
)
summarizer_ctx.start_receiving()
# Request current state from existing members
summarizer_ctx.request_snapshot()
time.sleep(1)
# Summarizer now has the full context
snap = summarizer_ctx.snapshot()
print(f"Received {len(snap)} context entries") The snapshot is sent through the same MLS-encrypted channel. The summarizer receives the current state without any member needing to re-send individual entries.
Circuit breaker
If an agent starts flooding context with writes — whether compromised or buggy — the circuit breaker stops the cascade:
# Auto-freeze if total writes exceed 100/minute
retriever_ctx.set_freeze_threshold(100)
planner_ctx.set_freeze_threshold(100)
# When the threshold is hit, context becomes read-only
# All writes raise ContextFrozenError until unfreeze() is called This addresses OWASP ASI08 — cascading failures from overwhelmed components. The freeze is local to each agent's context instance, so the circuit breaker activates independently on each participant.
Under the hood
Every ctx.set() call produces a STATE envelope:
{
"op": "set",
"key": "retrieval_latest_security...",
"value": {"query": "...", "documents": [...]},
"type": "artifact",
"scope": {"access": "all"},
"hlc": [1710345600000, 0, "a1b2c3d4"],
"sender_did": "did:key:z6MkRetriever...",
"ttl_ms": 300000,
"created_at": 1710345600000
} This JSON payload is encrypted with AES-128-GCM under MLS group keys before leaving the SDK. The relay routes the ciphertext. Receiving agents decrypt, verify the write policy, and apply via LWW (last-writer-wins) conflict resolution using the hybrid logical clock.
What this gives you
| Question | Without Skytale | With SharedContext |
|---|---|---|
| Who wrote this result? | Unknown | Cryptographic DID (sender_did) |
| Was it modified in transit? | No way to tell | MLS integrity (AES-128-GCM) |
| When was it written? | Local timestamp (unreliable) | Hybrid logical clock (HLC) |
| Is it still valid? | No expiration | TTL with automatic expiry |
| Can a rogue agent overwrite it? | Yes | Write policy enforcement |
| Can the relay read it? | Yes (plaintext) | No (MLS E2E encryption) |
Getting started
pip install skytale-sdk[langgraph] - SharedContext guide — full API reference with write policies and circuit breakers
- The Context Gap — why context attribution matters and what OWASP says about it
- Encrypting LangGraph Agents in 5 Minutes — basic encrypted channels without SharedContext
- GitHub repository — SDK source, examples, and issue tracker
- Free tier: 100K encrypted messages per month, no credit card required