Skip to content

Pipelines & Stages

Pipelines are the “middleware” of the nara runtime. Every message emitted or received passes through a sequence of stages that can transform, validate, store, or reject it.


Pipelines centralize cross-cutting concerns so services remain focused on business logic:

  • Signing: Messages are signed automatically based on behavior config
  • Storage: Ledger persistence handled uniformly
  • Verification: Signature validation happens before handlers see messages
  • Deduplication: Duplicate messages filtered without service involvement
  • Filtering: Personality-based filtering applied consistently

PrimitiveRole
StageA single unit of processing that returns an explicit result
PipelineAn ordered list of stages executed sequentially
StageResultThe outcome: Continue, Drop, or Fail
PipelineContextShared dependencies (ledger, transport, keypair)
  1. Every emitted message MUST pass through the emit pipeline before network transport.
  2. Every received message MUST pass through the receive pipeline before handler invocation.
  3. Message IDs include nanosecond timestamps—same content at different times produces different IDs.
  4. Critical messages (Importance 3) MUST NOT be filtered by personality stages.

Every stage returns a StageResult that explicitly communicates what happened. No silent failures.

flowchart LR
    Stage["Stage.Process()"]
    Stage --> Continue["Continue(msg)"]
    Stage --> Drop["Drop(reason)"]
    Stage --> Fail["Fail(error)"]

    Continue --> |"Message proceeds"| Next["Next Stage"]
    Drop --> |"Message discarded"| Done["Pipeline stops"]
    Fail --> |"Error propagates"| Error["ErrorStrategy applied"]
ResultWhen to useExample
Continue(msg)Stage succeeded, message should proceedSignature verified
Drop(reason)Message intentionally rejected"duplicate_id", "rate_limited"
Fail(error)Something went wrongTransport unreachable

Old designs used next() callbacks that could be forgotten. The explicit return ensures:

  • Can’t forget: You must return something
  • Clear error path: Errors are returned, not swallowed
  • Debuggable: The Reason field explains why a message was dropped

The pipeline runs stages sequentially. It stops on the first Drop or Fail.

flowchart TD
    Start([Pipeline.Run]) --> Loop{More stages?}
    Loop -->|Yes| Process["stage.Process(msg, ctx)"]
    Process --> Check{Result type?}

    Check -->|Continue| Update["msg = result.Message"]
    Update --> Loop

    Check -->|Drop| ReturnDrop["Return Drop(reason)"]
    Check -->|Fail| ReturnFail["Return Fail(error)"]

    Loop -->|No| ReturnOK["Return Continue(msg)"]

    style ReturnDrop fill:#f96,stroke:#333
    style ReturnFail fill:#f66,stroke:#333
    style ReturnOK fill:#6f6,stroke:#333
  1. Sequential: Stages run in order, not in parallel
  2. Early exit: First Drop or Fail stops the pipeline
  3. Message threading: Each Continue passes the (possibly modified) message to the next stage
  4. Final result: If all stages continue, the pipeline returns Continue(finalMsg)

When a service calls rt.Emit(msg), the message flows through the emit pipeline:

flowchart LR
    subgraph Emit["Emit Pipeline"]
        direction LR
        ID["IDStage<br/>compute ID"] --> CK["ContentKeyStage<br/>(optional)"]
        CK --> Sign["SignStage<br/>sign message"]
        Sign --> Store["StoreStage<br/>persist to ledger"]
        Store --> Gossip["GossipStage<br/>queue for gossip"]
        Gossip --> Transport["TransportStage<br/>MQTT or Mesh"]
        Transport --> Notify["NotifyStage<br/>notify subscribers"]
    end

    Service([Service]) --> ID
    Notify --> Done([Done])
StagePurposeDefault
IDStageCompute unique envelope IDAlways runs
ContentKeyStageCompute semantic dedup keyOnly if behavior defines ContentKey
SignStageSign with keypairDefaultSign()
StoreStagePersist to ledgerNoStore()
GossipStageAdd to gossip queueNoGossip()
TransportStageSend over networkBehavior-defined
NotifyStageNotify local subscribersAlways runs

When bytes arrive from the network, rt.Receive(raw) processes them:

flowchart LR
    subgraph Receive["Receive Pipeline"]
        direction LR
        Verify["VerifyStage<br/>check signature"] --> Dedupe["DedupeStage<br/>reject duplicates"]
        Dedupe --> Rate["RateLimitStage<br/>(optional)"]
        Rate --> Filter["FilterStage<br/>(optional)"]
        Filter --> Store["StoreStage<br/>persist"]
        Store --> Gossip["GossipStage<br/>spread to peers"]
        Gossip --> Notify["NotifyStage<br/>notify subscribers"]
    end

    Network([Network]) --> Verify
    Notify --> Handler([Handler])
StagePurposeDefault
VerifyStageValidate signatureDefaultVerify()
DedupeStageReject seen messagesIDDedupe()
RateLimitStageThrottle by keyOptional
FilterStagePersonality filteringOptional
StoreStagePersist to ledgerBehavior-defined
GossipStageSpread to peersIf behavior enables
NotifyStageTrigger handlersAlways runs

A stash:store message demonstrates the pipeline in action:

sequenceDiagram
    participant S as StashService
    participant R as Runtime
    participant P as Pipeline
    participant M as Mesh

    S->>R: Emit(stash:store to Bob)
    R->>R: Lookup behavior
    Note over R: MeshRequest template:<br/>Sign=Default, Store=No,<br/>Transport=MeshOnly

    R->>P: Run emit pipeline

    P->>P: IDStage → Continue
    Note right of P: ID = "abc123..."

    P->>P: SignStage → Continue
    Note right of P: Signature attached

    P->>P: NoStoreStage → Continue
    Note right of P: Not persisted

    P->>P: NoGossipStage → Continue
    Note right of P: Not gossiped

    P->>P: MeshOnlyStage
    P->>M: POST /mesh/message
    M-->>P: 200 OK
    P->>P: → Continue

    P->>P: NotifyStage → Continue

    P-->>R: Continue(msg)
    R-->>S: nil (success)
sequenceDiagram
    participant S as StashService
    participant R as Runtime
    participant P as Pipeline
    participant M as Mesh

    S->>R: Emit(stash:store to Bob)
    R->>P: Run emit pipeline

    P->>P: IDStage → Continue
    P->>P: SignStage → Continue
    P->>P: NoStoreStage → Continue
    P->>P: NoGossipStage → Continue

    P->>P: MeshOnlyStage
    P->>M: POST /mesh/message
    M-->>P: Connection refused
    P->>P: → Fail(error)

    P-->>R: Fail("mesh send to Bob: connection refused")
    R->>R: Apply ErrorStrategy
    R-->>S: error

StageDSL HelperBehavior
DefaultSignStageDefaultSign()Signs with runtime keypair
NoSignStageNoSign()Skips signing (signature in payload)
StageDSL HelperBehavior
DefaultStoreStageDefaultStore(priority)Persists with GC priority (0=never prune, 4=expendable)
ContentKeyStoreStageContentKeyStore(priority)Dedupes by ContentKey before storing
NoStoreStageNoStore()Skips storage (ephemeral)
StageDSL HelperBehavior
MQTTStageMQTT(topic)Broadcast to fixed topic
MQTTPerNaraStageMQTTPerNara(pattern)Broadcast to pattern % sender
MeshOnlyStageMeshOnly()Direct HTTP to ToID
NoTransportStageNoTransport()Local only
StageDSL HelperBehavior
DefaultVerifyStageDefaultVerify()Lookup pubkey by FromID
SelfAttestingVerifyStageSelfAttesting(extractKey)Extract pubkey from payload
CustomVerifyStageCustomVerify(fn)Custom verification logic
NoVerifyStageNoVerify()Skip verification
StageDSL HelperBehavior
IDDedupeStageIDDedupe()Reject if ID seen before
ContentKeyDedupeStageContentKeyDedupe()Reject if ContentKey seen
StageDSL HelperBehavior
ImportanceFilterStageCritical()Never filter (importance 3)
ImportanceFilterStageNormal()Filter if Chill > 85
ImportanceFilterStageCasual(fn)Custom filter function

When a stage fails, the behavior’s OnError strategy determines what happens:

StrategyBehavior
ErrorDropSilent drop
ErrorLogLog warning and drop
ErrorRetryRetry with backoff (not yet implemented)
ErrorQueueDead letter queue (not yet implemented)
ErrorPanicFatal error (critical messages)

FailureStageResult
Unknown senderDefaultVerifyStageDrop("unknown_sender")
Bad signatureDefaultVerifyStageDrop("invalid_signature")
Duplicate IDIDDedupeStageDrop("duplicate_id")
Duplicate factContentKeyDedupeStageDrop("duplicate_content")
Rate exceededRateLimitStageDrop("rate_limited")
Too chillImportanceFilterStageDrop("filtered_by_chill")
Mesh unreachableMeshOnlyStageFail(error)
MQTT publish failedMQTTStageFail(error)
Ledger fullDefaultStoreStageFail(error)

  • Given stages [A, B, C] where all return Continue, the result MUST be Continue with the final message.
  • Given stages [A, B, C] where B returns Drop("reason"), the result MUST be Drop("reason") and C MUST NOT execute.
  • Given stages [A, B, C] where B returns Fail(err), the result MUST be Fail(err) and C MUST NOT execute.
  • Given Personality.Chill = 90 and Importance = 2, the stage MUST return Drop("filtered_by_chill").
  • Given Personality.Chill = 90 and Importance = 3, the stage MUST return Continue.
  • Given Importance = 1 with a CasualFilter that returns false, the stage MUST return Drop("filtered_by_personality").
  • Two messages with identical Kind, FromID, and Payload but different nanosecond timestamps MUST produce different ID values.
  • IDDedupeStage: If Ledger.HasID(msg.ID) returns true, the stage MUST return Drop("duplicate_id").
  • ContentKeyDedupeStage: If msg.ContentKey != "" and Ledger.HasContentKey(msg.ContentKey) returns true, the stage MUST return Drop("duplicate_content").