Attributed Context for Multi-Agent RAG — Skytale

Your retriever fetches data. Your planner reasons over it. Neither can prove who wrote what. Here's how to fix that.

The hybrid RAG pattern is everywhere in 2026. A retriever agent searches a vector database, writes results to shared state, and a planner agent reasons over them. The pattern works — until you ask a basic question: how does the planner know the retriever actually wrote those results?

It doesn't. In every major framework — LangGraph, CrewAI, AutoGen — shared state is a data structure, not a signed artifact. Any agent can write any value. There is no mechanism to verify authorship, detect tampering, or enforce who is allowed to write what. OWASP ranked this ASI06 — memory and context poisoning — in their 2026 Agentic Top 10. Research from Obsidian Security found that a single compromised agent poisoned 87% of downstream decisions within four hours.

This tutorial builds a two-agent RAG pipeline with cryptographic context attribution using Skytale's SharedContext. The retriever writes results that are cryptographically linked to its DID. The planner verifies authorship before reasoning. A write policy ensures only the retriever can write retrieval results. The whole setup takes about 10 minutes.

What you'll build

Two LangGraph agents communicating over an MLS-encrypted channel with attributed shared context:

  • Retriever agent — searches a vector store, writes results to SharedContext with its DID attached to every entry
  • Planner agent — reads results, verifies the sender DID before reasoning, rejects unsigned or unauthorized writes

Every context write is encrypted with MLS (RFC 9420) and tagged with the sender's cryptographic identity. The relay routes ciphertext — it never sees keys, values, or agent DIDs.

Step 1: Install and set up agents

pip install skytale-sdk[langgraph] langchain-openai chromadb
from skytale_sdk import SkytaleChannelManager
from skytale_sdk.context import SharedContext
from skytale_sdk.context_types import ContextType, WritePolicy

# Create two agents with distinct identities
retriever_mgr = SkytaleChannelManager(
    identity=b"retriever-agent",
    api_key="sk_live_...",
)
planner_mgr = SkytaleChannelManager(
    identity=b"planner-agent",
    api_key="sk_live_...",
)

# Retriever creates the channel, planner joins
retriever_mgr.create("acme/research/context")
token = retriever_mgr.invite("acme/research/context")
planner_mgr.join_with_token("acme/research/context", token)

Each agent gets a DID derived from its identity bytes. When an agent writes to SharedContext, that DID is cryptographically attached to every entry via the sender_did field.

Step 2: Set up SharedContext with write policies

# Retriever's context — it will write results here
retriever_ctx = SharedContext(
    retriever_mgr, "acme/research/context",
)

# Planner's context — it reads and verifies
planner_ctx = SharedContext(
    planner_mgr, "acme/research/context",
)
planner_ctx.start_receiving()

# Get the retriever's DID for the write policy
retriever_did = retriever_mgr._did_uri

# Lock down retrieval keys: only the retriever can write them
retriever_ctx.set_write_policy("retrieval_*", WritePolicy(
    allowed_writers=[retriever_did],
    max_value_size=8192,
    write_rate_limit=30,
))
planner_ctx.set_write_policy("retrieval_*", WritePolicy(
    allowed_writers=[retriever_did],
    max_value_size=8192,
    write_rate_limit=30,
))

Why set the policy on both sides? Write policies are enforced locally — both on set() (outbound) and _apply() (inbound). The retriever's policy prevents unauthorized local writes. The planner's policy rejects unauthorized remote writes. A compromised agent that bypasses its local check still gets rejected by every other agent in the channel.

Step 3: Retriever writes attributed results

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI

# Your existing vector store
vectorstore = Chroma(embedding_function=OpenAIEmbeddings())

def retrieve_and_attribute(query: str) -> dict:
    """Search vector store, write attributed results to SharedContext."""
    docs = vectorstore.similarity_search(query, k=5)

    results = {
        "query": query,
        "documents": [
            {"content": doc.page_content, "source": doc.metadata.get("source", "")}
            for doc in docs
        ],
    }

    # Write to SharedContext — sender_did is attached automatically
    retriever_ctx.set(
        f"retrieval_{query[:32].replace(' ', '_')}",
        results,
        type=ContextType.ARTIFACT,
        ttl_ms=300_000,  # 5 minute TTL
    )
    return results

When retriever_ctx.set() executes, three things happen:

  • The write policy check runs — is this agent in allowed_writers? Is the value under 8 KB? Is the rate under 30/min?
  • The entry is stamped with the retriever's DID (sender_did) and a hybrid logical clock timestamp (hlc)
  • The entry is serialized into an MLS-encrypted STATE envelope and broadcast to the channel

Step 4: Planner reads with verified provenance

import time

# Wait for the retriever to populate context
retrieve_and_attribute("latest security vulnerabilities in agent frameworks")
time.sleep(2)

# Planner reads with full provenance
entries = planner_ctx.list(type_filter=ContextType.ARTIFACT)

for key in entries:
    entry = planner_ctx.get_entry(key)
    if entry is None:
        continue

    # Verify attribution before reasoning
    if entry.sender_did != retriever_did:
        print(f"REJECTED: {key} written by unknown agent {entry.sender_did}")
        continue

    print(f"Verified: {key}")
    print(f"  Author: {entry.sender_did}")
    print(f"  Clock:  {entry.hlc.wall_ms}")
    print(f"  Docs:   {len(entry.value['documents'])} results")

    # Safe to reason over — provenance verified
    planner_prompt = (
        f"Based on these verified research results "
        f"(retrieved by {entry.sender_did} at {entry.hlc.wall_ms}):\n"
        f"{entry.value}"
    )

The planner uses get_entry() instead of get() to access the full ContextEntry — including sender_did, hlc, scope, and ttl_ms. This is the difference between reading a value and reading an attributed value.

What happens when it's attacked

Suppose a compromised agent joins the channel and tries to poison the retrieval results:

# Attacker joins the channel and tries to write fake results
attacker_ctx = SharedContext(attacker_mgr, "acme/research/context")

try:
    attacker_ctx.set("retrieval_fake", {"documents": [{"content": "malicious data"}]})
except WritePolicyViolation as e:
    print(e)
    # "agent did:key:z6MkAttacker... is not allowed to write to key
    #  'retrieval_fake' (allowed: ['did:key:z6MkRetriever...'])"

# Even if the attacker bypasses their local check (modified SDK),
# the planner's _apply() enforces the same policy and silently
# drops the unauthorized write.

Three layers of defense:

LayerWhat it doesOWASP
Write policyOnly the retriever's DID can write retrieval_* keysASI06
Sender attributionEvery entry is tagged with the writer's cryptographic DIDASI06
MLS encryptionThe relay never sees context keys, values, or DIDsASI07

Late-joining agents

When a third agent joins the channel later — say, a summarizer — it needs the current context without replaying the full message history. SharedContext handles this with snapshot-on-join:

# A summarizer agent joins later
summarizer_mgr = SkytaleChannelManager(
    identity=b"summarizer-agent",
    api_key="sk_live_...",
)
token = retriever_mgr.invite("acme/research/context")
summarizer_mgr.join_with_token("acme/research/context", token)

summarizer_ctx = SharedContext(
    summarizer_mgr, "acme/research/context",
)
summarizer_ctx.start_receiving()

# Request current state from existing members
summarizer_ctx.request_snapshot()
time.sleep(1)

# Summarizer now has the full context
snap = summarizer_ctx.snapshot()
print(f"Received {len(snap)} context entries")

The snapshot is sent through the same MLS-encrypted channel. The summarizer receives the current state without any member needing to re-send individual entries.

Circuit breaker

If an agent starts flooding context with writes — whether compromised or buggy — the circuit breaker stops the cascade:

# Auto-freeze if total writes exceed 100/minute
retriever_ctx.set_freeze_threshold(100)
planner_ctx.set_freeze_threshold(100)

# When the threshold is hit, context becomes read-only
# All writes raise ContextFrozenError until unfreeze() is called

This addresses OWASP ASI08 — cascading failures from overwhelmed components. The freeze is local to each agent's context instance, so the circuit breaker activates independently on each participant.

Under the hood

Every ctx.set() call produces a STATE envelope:

{
  "op": "set",
  "key": "retrieval_latest_security...",
  "value": {"query": "...", "documents": [...]},
  "type": "artifact",
  "scope": {"access": "all"},
  "hlc": [1710345600000, 0, "a1b2c3d4"],
  "sender_did": "did:key:z6MkRetriever...",
  "ttl_ms": 300000,
  "created_at": 1710345600000
}

This JSON payload is encrypted with AES-128-GCM under MLS group keys before leaving the SDK. The relay routes the ciphertext. Receiving agents decrypt, verify the write policy, and apply via LWW (last-writer-wins) conflict resolution using the hybrid logical clock.

What this gives you

QuestionWithout SkytaleWith SharedContext
Who wrote this result?UnknownCryptographic DID (sender_did)
Was it modified in transit?No way to tellMLS integrity (AES-128-GCM)
When was it written?Local timestamp (unreliable)Hybrid logical clock (HLC)
Is it still valid?No expirationTTL with automatic expiry
Can a rogue agent overwrite it?YesWrite policy enforcement
Can the relay read it?Yes (plaintext)No (MLS E2E encryption)

Getting started

pip install skytale-sdk[langgraph]