Persistence & Multi-tenancy (Blackboard)¶
The blackboard is a tenant-scoped key/value store that persists values across tasks within a pipeline and across sub-pipelines within a plan. Tasks write to it via the store tool and read from it via {{session.key}} template references.
Concepts¶
| Term | Meaning |
|---|---|
| Blackboard | The persistence layer — an interface with read_many, write, and get_all methods |
| Session | The slice of the blackboard visible to a single pipeline execution — a dict of key → value |
| Tenant | A named namespace that isolates one user or workspace from another |
store task |
The pipeline tool that writes a value to the blackboard under a given key |
{{session.key}} |
Template expression that reads a value from the session dict |
The Blackboard interface¶
class Blackboard:
def get_all(self, tenant_id: str) -> dict[str, Any]: ...
def read_many(self, tenant_id: str, keys: Iterable[str]) -> dict[str, Any]: ...
def write(self, tenant_id: str, key: str, value: Any, *, append: bool = False) -> None: ...
| Method | Description |
|---|---|
get_all(tenant_id) |
Return every key/value pair for the tenant — used by run_plan to expose the full session to each sub-pipeline |
read_many(tenant_id, keys) |
Return a subset of keys — used when a pipeline declares a reads list |
write(tenant_id, key, value, append=False) |
Write a value; if append=True, adds to a list (creating one if the key doesn't exist) |
The interface is intentionally narrow. A future Redis, Postgres, or Prefect Blocks implementation needs only these three methods.
InMemoryBlackboard¶
The default implementation. Data lives in a plain Python dict and is lost when the process exits:
# Data layout
{
"default": {"filing_text": "...", "extracted_fields": {...}},
"acme-corp": {"filing_text": "...", "extracted_fields": {...}},
}
Each Orchestrator instance owns one InMemoryBlackboard by default. For the in-memory run queue, each background run creates its own Orchestrator, so runs are fully isolated from each other.
Tenant isolation¶
Every Orchestrator is constructed with a tenant_id (default: "default"). All blackboard reads and writes are keyed by that tenant ID — a write to key "filing_text" for tenant "acme-corp" is completely invisible to tenant "beta-corp".
from trellis.execution.orchestrator import Orchestrator
from trellis.execution.blackboard import InMemoryBlackboard
# Shared blackboard, two tenants
bb = InMemoryBlackboard()
orch_a = Orchestrator(blackboard=bb, tenant_id="acme-corp")
orch_b = Orchestrator(blackboard=bb, tenant_id="beta-corp")
# orch_a and orch_b share the same InMemoryBlackboard object
# but write/read from separate namespaces
For the REST API's queued runs, tenant_id is passed in the POST /pipelines/run_async request body. Each queued run creates its own Orchestrator with that tenant ID.
Writing with the store tool¶
The store tool is how pipelines write to the blackboard:
- id: cache_filing
tool: store
inputs:
key: filing_text
value: "{{ingest.output}}"
- id: cache_fields
tool: store
inputs:
key: extracted_fields
value: "{{extract.output}}"
append: false # default — overwrites any existing value
Append mode — accumulates results into a list. Useful when a parallel_over fan-out runs store once per item:
- id: store_each_result
tool: store
parallel_over: "{{fetch.output.results}}"
inputs:
key: all_filings
value: "{{item}}"
append: true # each item is appended to the list at "all_filings"
store echoes value as its task output so downstream tasks can still reference {{cache_filing.output}} normally.
store task fields¶
| Input | Type | Default | Description |
|---|---|---|---|
key |
string | required | Blackboard key to write |
value |
any | required | Value to store; any pipeline-serializable type |
append |
bool | false |
Append to list rather than overwrite |
Reading with {{session.key}}¶
Values in the session are available to all tasks in the same pipeline via {{session.key}}:
- id: summarize
tool: llm_job
inputs:
context: "{{session.extracted_fields}}"
prompt: "Summarize the extracted fields."
await:
- cache_fields # ensure cache_fields ran before summarize
The await barrier here is essential — summarize has no template reference to cache_fields.output, so without await the executor might schedule them in the same wave before the session value is written.
The session dict passed to a pipeline at construction time seeds these values before any task runs. Pre-seeded values are available immediately without any store task:
result = await orch.run_pipeline(
pipeline,
session={"auth_token": "...", "company_cik": "0000320193"},
)
Session flow in plans¶
When running a plan (Orchestrator.run_plan), the blackboard is the communication channel between sub-pipelines. After each sub-pipeline completes, the full blackboard is exposed as the session for the next sub-pipeline:
Plan execution flow
───────────────────
Wave 1: fetch_10k ──→ store: filing_text
Wave 1: fetch_schema ─→ store: schema_handle
Wave 2: spread ← session.filing_text (written by fetch_10k)
← session.schema_handle (written by fetch_schema)
The plan YAML declares which keys each sub-pipeline produces (stores) and consumes (reads). The executor uses these declarations to:
- Infer sub-pipeline ordering — a sub-pipeline that
readsa key must run after the one thatstoresit. - Validate contracts —
trellis validate plan.yamlchecks that everyreadskey is satisfied and that no two sub-pipelines write the same key.
plan:
id: spreading_plan
sub_pipelines:
- id: fetch_10k
stores: [filing_text]
- id: fetch_schema
stores: [schema_handle]
- id: spread
reads: [filing_text, schema_handle] # ← depends on both above
Swapping the blackboard backend¶
Pass a custom Blackboard implementation to Orchestrator to replace InMemoryBlackboard. The contract is three methods — nothing else needs to change:
import redis
from trellis.execution.blackboard import Blackboard
class RedisBlackboard(Blackboard):
def __init__(self, url: str) -> None:
self._r = redis.from_url(url)
def get_all(self, tenant_id: str) -> dict:
keys = self._r.hkeys(tenant_id)
if not keys:
return {}
return {k.decode(): json.loads(v) for k, v in
zip(keys, self._r.hmget(tenant_id, keys))}
def read_many(self, tenant_id: str, keys) -> dict:
vals = self._r.hmget(tenant_id, list(keys))
return {k: json.loads(v) for k, v in zip(keys, vals) if v is not None}
def write(self, tenant_id: str, key: str, value, *, append: bool = False) -> None:
if append:
existing = self._r.hget(tenant_id, key)
lst = json.loads(existing) if existing else []
lst.append(value)
self._r.hset(tenant_id, key, json.dumps(lst))
else:
self._r.hset(tenant_id, key, json.dumps(value))
bb = RedisBlackboard("redis://localhost:6379")
orch = Orchestrator(blackboard=bb, tenant_id="acme-corp")
Multi-tenancy in the REST API¶
Pass tenant_id in the POST /pipelines/run_async request body to isolate runs by workspace:
curl -X POST http://localhost:8000/pipelines/run_async \
-H "Content-Type: application/json" \
-d '{
"pipeline": { ... },
"tenant_id": "acme-corp"
}'
Each queued run creates its own Orchestrator with the specified tenant_id. All store writes from that run are namespaced under "acme-corp" and invisible to other tenants.
For the synchronous endpoint (POST /pipelines/run), the orchestrator always uses the "default" tenant and a fresh InMemoryBlackboard, so there is no cross-run bleed.
Inspecting blackboard state¶
After a plan run, PlanRunResult.blackboard contains the full final blackboard for the tenant:
plan_result = await orch.run_plan(plan, plan_dir)
print(plan_result.blackboard)
# {"filing_text": {...}, "schema_handle": {...}, "extracted_fields": {...}}
From the CLI with --json:
Next steps¶
- Execution Backends & Run Queue — executor options, timeouts, retries, and the background queue
- Pipeline DSL Reference —
storetask syntax,awaitbarriers,{{session.key}}templates - Tools & Registry —
storetool inputs reference