Skip to content

Runtime

import "github.com/eljojo/nara/runtime"

EphemeralDefaults - not stored, not gossiped, unverified

var EphemeralDefaults = Behavior{
Emit: EmitBehavior{Sign: NoSign(), Store: NoStore(), Gossip: NoGossip()},
Receive: ReceiveBehavior{Verify: NoVerify(), Dedupe: IDDedupe(), Store: NoStore()},
}

ErrCallTimeout is returned when a call times out.

var ErrCallTimeout = errors.New("call timed out")

LocalDefaults - for service-to-service communication within a nara No network transport, no signing, no storage - just internal event routing

var LocalDefaults = Behavior{
Emit: EmitBehavior{Sign: NoSign(), Store: NoStore(), Gossip: NoGossip(), Transport: NoTransport()},
Receive: ReceiveBehavior{Verify: NoVerify(), Dedupe: IDDedupe(), Store: NoStore()},
}

ProtocolDefaults - not stored, not gossiped, verified, critical

var ProtocolDefaults = Behavior{
Emit: EmitBehavior{Sign: DefaultSign(), Store: NoStore(), Gossip: NoGossip()},
Receive: ReceiveBehavior{Verify: DefaultVerify(), Dedupe: IDDedupe(), Store: NoStore(), Filter: Critical()},
}

ProtocolUnverifiedDefaults - like Protocol but no signature verification

var ProtocolUnverifiedDefaults = Behavior{
Emit: EmitBehavior{Sign: NoSign(), Store: NoStore(), Gossip: NoGossip()},
Receive: ReceiveBehavior{Verify: NoVerify(), Dedupe: IDDedupe(), Store: NoStore(), Filter: Critical()},
}

StoredDefaults - persisted, gossiped, verified

var StoredDefaults = Behavior{
Emit: EmitBehavior{Sign: DefaultSign(), Store: DefaultStore(2), Gossip: Gossip(), Transport: NoTransport()},
Receive: ReceiveBehavior{Verify: DefaultVerify(), Dedupe: IDDedupe(), Store: DefaultStore(2)},
}

func ComputeID(msg *Message) (string, error)

ComputeID generates a unique envelope ID from message content.

The ID is deterministic but always unique per message instance because it includes the timestamp with nanosecond precision.

func PayloadTypeOf[T any]() reflect.Type

PayloadTypeOf is a helper to get reflect.Type from a struct.

func TypedHandler[T any](fn func(*Message, *T) error) any

TypedHandler wraps a typed handler function for the registry.

func WithPayloadType[T any]() reflect.Type

WithPayloadType is a generic version of WithPayload.

Behavior defines how a message kind is handled.

Each message kind (like “stash:store”, “checkpoint”, “social”) has a Behavior that declares:

  • What payload type to deserialize
  • How to process it when emitting (signing, storage, transport)
  • How to process it when receiving (verification, dedup, filtering)
  • Version-specific typed handlers
type Behavior struct {
// Identity
Kind string // Unique identifier, e.g., "observation:restart", "stash:store"
Description string // Human-readable description
// Versioning
CurrentVersion int // Version for new messages (default 1)
MinVersion int // Oldest version still accepted (default 1)
PayloadTypes map[int]reflect.Type // Payload type per version (required)
// Version-specific handlers (typed via TypedHandler helper)
// Each handler has signature: func(*Message, *PayloadType) error
Handlers map[int]any
// ContentKey derivation (nil = no content key)
// Used for cross-observer deduplication (like observations)
ContentKey func(payload any) string
// Pipeline stages - split by direction
Emit EmitBehavior
Receive ReceiveBehavior
}

func BroadcastEvent(kind, desc string, priority int, topic string) *Behavior

BroadcastEvent creates a behavior for stored, gossiped, AND broadcast via MQTT.

func Ephemeral(kind, desc, topic string) *Behavior

Ephemeral creates a behavior for ephemeral broadcasts (no storage, MQTT only).

func Local(kind, desc string) *Behavior

Local creates a behavior for service-to-service communication within a nara. No network transport, no signing, no storage - just internal routing.

func MeshRequest(kind, desc string) *Behavior

MeshRequest creates a behavior for protocol messages over mesh (not MQTT).

func Protocol(kind, desc, topic string) *Behavior

Protocol creates a behavior for protocol messages (not stored, verified, critical).

func ProtocolUnverified(kind, desc, topic string) *Behavior

ProtocolUnverified creates a protocol behavior without signature verification.

func StoredEvent(kind, desc string, priority int) *Behavior

StoredEvent creates a behavior for stored, gossiped events (no MQTT broadcast).

func (b *Behavior) WithContentKey(fn func(any) string) *Behavior

WithContentKey adds ContentKey derivation to a behavior.

func (b *Behavior) WithFilter(stage Stage) *Behavior

WithFilter customizes the receive filter.

func (b *Behavior) WithHandler(version int, fn any) *Behavior

WithHandler adds a typed handler for a specific version. Usage: .WithHandler(1, service.handleV1).WithHandler(2, service.handleV2)

func (b *Behavior) WithPayload(t reflect.Type) *Behavior

WithPayload sets the payload type for v1 (defaults to v1).

func (b *Behavior) WithRateLimit(stage Stage) *Behavior

WithRateLimit adds rate limiting to the receive pipeline.

BehaviorRegistrar is optionally implemented by services that register behaviors.

type BehaviorRegistrar interface {
RegisterBehaviors(rt RuntimeInterface)
}

CallRegistry manages pending Call requests.

This is the central request/response correlation system. When a service calls rt.Call(msg, timeout), the registry tracks the pending request and matches incoming responses via InReplyTo.

type CallRegistry struct {
// contains filtered or unexported fields
}

func NewCallRegistry() *CallRegistry

NewCallRegistry creates a new call registry.

func (r *CallRegistry) Cancel(id string)

Cancel removes a pending call without resolving it.

func (r *CallRegistry) Register(id string, ch chan CallResult, timeout time.Duration)

Register adds a pending call to track.

func (r *CallRegistry) Resolve(inReplyTo string, response *Message) bool

Resolve matches an incoming response to a pending call. Returns true if a pending call was found and resolved.

CallResult is returned by CallAsync (Chapter 3).

type CallResult struct {
Response *Message
Error error
}

ContentKeyDedupeStage rejects messages with duplicate ContentKey (same fact).

Used for observations where multiple naras may report the same fact. Different from IDDedupe: same ContentKey, different IDs.

type ContentKeyDedupeStage struct{}

func (s *ContentKeyDedupeStage) Process(msg *Message, ctx *PipelineContext) StageResult

ContentKeyStage computes the semantic identity for dedup.

Only used for messages that need cross-observer deduplication (like observations).

type ContentKeyStage struct {
KeyFunc func(payload any) string
}

func (s *ContentKeyStage) Process(msg *Message, ctx *PipelineContext) StageResult

ContentKeyStoreStage stores with ContentKey-based deduplication.

Only stores if no message with the same ContentKey exists. Used for observations where multiple naras may report the same fact.

type ContentKeyStoreStage struct {
Priority int
}

func (s *ContentKeyStoreStage) Process(msg *Message, ctx *PipelineContext) StageResult

CustomVerifyStage uses a custom verification function.

Used for complex verification like checkpoint multi-sig.

type CustomVerifyStage struct {
VerifyFunc func(msg *Message, ctx *PipelineContext) StageResult
}

func (s *CustomVerifyStage) Process(msg *Message, ctx *PipelineContext) StageResult

DefaultSignStage signs the message with the runtime’s keypair.

type DefaultSignStage struct{}

func (s *DefaultSignStage) Process(msg *Message, ctx *PipelineContext) StageResult

DefaultStoreStage stores the message in the ledger with a GC priority.

Needed for Chapter 2.

type DefaultStoreStage struct {
Priority int // 0 = never prune, higher = prune sooner
}

func (s *DefaultStoreStage) Process(msg *Message, ctx *PipelineContext) StageResult

DefaultVerifyStage verifies the message signature against a known public key.

Looks up the public key by FromID and verifies the signature.

type DefaultVerifyStage struct{}

func (s *DefaultVerifyStage) Process(msg *Message, ctx *PipelineContext) StageResult

EmitBehavior defines how outgoing messages are processed.

type EmitBehavior struct {
Sign Stage // How to sign (default: DefaultSign)
Store Stage // How to store (default: DefaultStore(2))
Gossip Stage // Whether to gossip (default: NoGossip)
Transport Stage // How to send (required)
OnError ErrorStrategy // What to do on failure
}

Environment enum for runtime behavior.

Like Rails environments - different defaults for different contexts.

type Environment int

const (
EnvProduction Environment = iota // Graceful: log errors, don't crash
EnvDevelopment // Loud: warnings, fail on suspicious things
EnvTest // Strict: panic on errors, catch bugs early
)

ErrorStrategy defines how to handle errors at each pipeline stage.

type ErrorStrategy int

const (
ErrorDrop ErrorStrategy = iota // Drop message silently
ErrorLog // Log warning and drop
ErrorRetry // Retry with exponential backoff
ErrorQueue // Send to dead letter queue for inspection
ErrorPanic // Fail loudly (for critical messages)
)

EventBusInterface is what notification stages use.

type EventBusInterface interface {
Emit(msg *Message)
Subscribe(kind string, handler func(*Message))
}

GossipQueueInterface is what gossip stages use.

type GossipQueueInterface interface {
Add(msg *Message)
}

GossipStage explicitly queues the message for gossip.

The gossip service will include this message in zines sent to peers.

type GossipStage struct{}

func (s *GossipStage) Process(msg *Message, ctx *PipelineContext) StageResult

IDDedupeStage rejects messages with duplicate ID (exact same message).

This prevents processing the same message twice if received via multiple paths.

type IDDedupeStage struct{}

func (s *IDDedupeStage) Process(msg *Message, ctx *PipelineContext) StageResult

IDStage computes the unique envelope ID for a message.

This always runs first in the emit pipeline. The ID is deterministic but always unique per message instance (includes nanosecond timestamp).

type IDStage struct{}

func (s *IDStage) Process(msg *Message, ctx *PipelineContext) StageResult

IdentityInterface provides public key lookups for message verification.

type IdentityInterface interface {
LookupPublicKey(id types.NaraID) []byte
RegisterPublicKey(id types.NaraID, key []byte)
}

ImportanceFilterStage filters messages based on importance level and personality.

Importance levels:

  • 3 (Critical): Never filtered
  • 2 (Normal): Filtered only if very chill
  • 1 (Casual): Uses custom filter function
type ImportanceFilterStage struct {
Importance int // 1=casual, 2=normal, 3=critical
CasualFilter func(msg *Message, personality *Personality) bool
}

func (s *ImportanceFilterStage) Process(msg *Message, ctx *PipelineContext) StageResult

KeypairInterface is what sign stages use.

type KeypairInterface interface {
Sign(data []byte) []byte
PublicKey() []byte
// Encryption (self-encryption using XChaCha20-Poly1305)
Seal(plaintext []byte) (nonce, ciphertext []byte, err error)
Open(nonce, ciphertext []byte) ([]byte, error)
}

LedgerInterface is what store stages use.

type LedgerInterface interface {
Add(msg *Message, priority int) error
HasID(id string) bool
HasContentKey(contentKey string) bool
}

Logger is the default logger implementation that logs to logrus. This is used when no custom logger is provided.

Supports per-service filtering via Disable/Enable methods.

type Logger struct {
// contains filtered or unexported fields
}

func NewLogger(disabledServices ...string) *Logger

NewLogger creates a new logger with optional disabled services.

func (l *Logger) Debug(service string, format string, args ...any)

Debug logs a debug message.

func (l *Logger) Disable(services ...string)

Disable suppresses logging for the given service names.

func (l *Logger) Enable(services ...string)

Enable re-enables logging for the given service names.

func (l *Logger) Error(service string, format string, args ...any)

Error logs an error message.

func (l *Logger) Info(service string, format string, args ...any)

Info logs an info message.

func (l *Logger) Warn(service string, format string, args ...any)

Warn logs a warning message.

LoggerInterface is the interface that the runtime logger must implement. This allows services to log without depending on the concrete logger implementation.

type LoggerInterface interface {
Debug(service string, format string, args ...any)
Info(service string, format string, args ...any)
Warn(service string, format string, args ...any)
Error(service string, format string, args ...any)
}

MQTTPerNaraStage broadcasts to a per-nara topic.

type MQTTPerNaraStage struct {
TopicPattern string // e.g., "nara/newspaper/%s"
}

func (s *MQTTPerNaraStage) Process(msg *Message, ctx *PipelineContext) StageResult

MQTTStage broadcasts to a fixed MQTT topic.

type MQTTStage struct {
Topic string
}

func (s *MQTTStage) Process(msg *Message, ctx *PipelineContext) StageResult

MeshOnlyStage sends the message directly via mesh to a specific target.

Fails if the target is unreachable. Used by stash for direct peer communication.

type MeshOnlyStage struct{}

func (s *MeshOnlyStage) Process(msg *Message, ctx *PipelineContext) StageResult

Message is the universal primitive for all communication in Nara.

Everything that flows through the system is a Message: stored events, ephemeral broadcasts, protocol exchanges, and internal service communication.

type Message struct {
// Core identity (always present)
ID string `json:"id"` // Unique envelope identifier (always unique per message instance)
ContentKey string `json:"content_key,omitempty"` // Semantic identity for dedup (optional, stable across observers)
Kind string `json:"kind"` // Message type: "hey-there", "observation:restart", "checkpoint", etc.
Version int `json:"version"` // Schema version for this kind (default 1, increment on breaking changes)
From types.NaraName `json:"from,omitempty"` // Sender name (for display)
FromID types.NaraID `json:"from_id"` // Sender nara ID (primary identifier)
To types.NaraName `json:"to,omitempty"` // Target name (for direct messages, display only)
ToID types.NaraID `json:"to_id,omitempty"` // Target nara ID (for direct messages, primary identifier)
Timestamp time.Time `json:"timestamp"` // When it was created
// Content
Payload any `json:"payload"` // Kind-specific data (Go struct, runtime handles serialization)
// Cryptographic (attached by runtime)
Signature []byte `json:"signature,omitempty"` // Creator's signature (may be nil for some kinds)
// Correlation (for Call/response pattern - Chapter 3)
InReplyTo string `json:"in_reply_to,omitempty"` // Links response to request (for Call/response pattern)
}

func (m *Message) Marshal() ([]byte, error)

Marshal serializes the message for network transport.

func (m *Message) Reply(kind string, payload any) *Message

Reply creates a response message linked to the original.

Automatically sets InReplyTo, ToID, and swaps the direction. Used for request/response patterns (Chapter 3).

func (m *Message) SignableContent() ([]byte, error)

SignableContent returns the canonical bytes to be signed.

This ensures consistent signing across the network.

func (m *Message) VerifySignature(pubKey []byte) bool

VerifySignature checks if the signature is valid for this message.

MockKeypair is a fake keypair for testing.

type MockKeypair struct {
// contains filtered or unexported fields
}

func NewMockKeypair() *MockKeypair

NewMockKeypair creates a new mock keypair.

func (k *MockKeypair) Open(nonce, ciphertext []byte) ([]byte, error)

Open decrypts ciphertext using XChaCha20-Poly1305.

func (k *MockKeypair) PublicKey() []byte

func (k *MockKeypair) Seal(plaintext []byte) (nonce, ciphertext []byte, err error)

Seal encrypts plaintext using XChaCha20-Poly1305.

func (k *MockKeypair) Sign(data []byte) []byte

MockRuntime implements RuntimeInterface for testing services.

It captures emitted messages, allows delivering messages to services, and provides fake infrastructure without MQTT/mesh dependencies.

type MockRuntime struct {
Emitted []*Message // Captured Emit() calls for assertions
// contains filtered or unexported fields
}

func NewMockRuntime(t *testing.T, name types.NaraName, id types.NaraID) *MockRuntime

NewMockRuntime creates a mock runtime with auto-cleanup via t.Cleanup().

func (m *MockRuntime) Call(msg *Message, timeout time.Duration) <-chan CallResult

Call emits a message and tracks for response correlation.

func (m *MockRuntime) Clear()

Clear clears all captured messages.

func (m *MockRuntime) Deliver(msg *Message) error

Deliver simulates receiving a message (calls behavior handlers).

If the message has InReplyTo set, it first checks if there’s a pending Call waiting for that response (simulating how the real runtime works). If the call is resolved, the handler is NOT invoked.

Returns the error from the handler (if any) so tests can assert on it. Use this to test how a service reacts to incoming messages.

func (m *MockRuntime) Emit(msg *Message) error

Emit captures messages for test assertions.

func (m *MockRuntime) EmittedCount() int

EmittedCount returns the number of emitted messages.

func (m *MockRuntime) EmittedOfKind(kind string) []*Message

EmittedOfKind returns all emitted messages of a given kind.

func (m *MockRuntime) Env() Environment

func (m *MockRuntime) Identity() IdentityInterface

Identity returns the identity interface (MockRuntime implements it).

func (m *MockRuntime) InitService(svc Service) error

InitService initializes a service the same way the real runtime does. It auto-populates ServiceBase (if embedded) and calls Init().

func (m *MockRuntime) Keypair() KeypairInterface

Keypair returns the keypair interface.

func (m *MockRuntime) LastEmitted() *Message

LastEmitted returns the most recently emitted message.

func (m *MockRuntime) Log(service string) *ServiceLog

func (m *MockRuntime) LookupPublicKey(id types.NaraID) []byte

func (m *MockRuntime) Me() *Nara

func (m *MockRuntime) MeID() types.NaraID

func (m *MockRuntime) MemoryMode() string

func (m *MockRuntime) OnlinePeers() []*PeerInfo

func (m *MockRuntime) RegisterBehavior(b *Behavior)

RegisterBehavior allows tests to register behaviors manually.

func (m *MockRuntime) RegisterPublicKey(id types.NaraID, key []byte)

func (m *MockRuntime) ResolveCall(inReplyTo string, response *Message) bool

ResolveCall allows tests to simulate a response arriving. Call this after checking Emitted to simulate the response.

func (m *MockRuntime) SetMemoryMode(mode string)

SetMemoryMode sets the memory mode for testing (low/medium/high).

func (m *MockRuntime) Stop()

Stop cleans up the mock runtime.

func (m *MockRuntime) Subscribe(kind string, handler func(*Message))

Subscribe registers a handler for a message kind.

func (m *MockRuntime) WaitForEmittedCount(count int, timeout time.Duration) bool

WaitForEmittedCount waits until at least count messages have been emitted. Returns true if the count was reached, false on timeout.

Nara represents a network participant.

Notice there’s also Nara struct in nara.go The full Nara struct will be migrated in Chapter 2.

type Nara struct {
ID types.NaraID
Name types.NaraName
}

NetworkInfoInterface provides network and peer information.

type NetworkInfoInterface interface {
OnlinePeers() []*PeerInfo
MemoryMode() string
}

NoContentKeyStage is a no-op (most messages don’t need content keys).

type NoContentKeyStage struct{}

func (s *NoContentKeyStage) Process(msg *Message, ctx *PipelineContext) StageResult

NoFilterStage is a no-op filter (all messages pass).

type NoFilterStage struct{}

func (s *NoFilterStage) Process(msg *Message, ctx *PipelineContext) StageResult

NoGossipStage skips gossip (message won’t be included in zines).

Stash uses this because stash messages are direct mesh only.

type NoGossipStage struct{}

func (s *NoGossipStage) Process(msg *Message, ctx *PipelineContext) StageResult

NoSignStage skips signing (for messages where signature is in payload or not needed).

type NoSignStage struct{}

func (s *NoSignStage) Process(msg *Message, ctx *PipelineContext) StageResult

NoStoreStage skips storage (for ephemeral messages).

Stash uses this because stash messages don’t need to be stored in the ledger.

type NoStoreStage struct{}

func (s *NoStoreStage) Process(msg *Message, ctx *PipelineContext) StageResult

NoTransportStage skips network transport (local-only messages).

Used for service-to-service communication within a single nara.

type NoTransportStage struct{}

func (s *NoTransportStage) Process(msg *Message, ctx *PipelineContext) StageResult

NoVerifyStage skips verification (for unverified protocol messages or local messages).

type NoVerifyStage struct{}

func (s *NoVerifyStage) Process(msg *Message, ctx *PipelineContext) StageResult

NotifyStage always runs last in the emit pipeline.

It notifies local subscribers that a message was emitted. This is how services can react to their own emitted messages.

type NotifyStage struct{}

func (s *NotifyStage) Process(msg *Message, ctx *PipelineContext) StageResult

PeerInfo contains information about a network peer.

type PeerInfo struct {
ID types.NaraID
Name types.NaraName
Uptime time.Duration
}

Personality affects filtering behavior.

type Personality struct {
Agreeableness int // 0-100
Sociability int // 0-100
Chill int // 0-100
}

Pipeline chains multiple stages together.

Messages flow through stages sequentially. If any stage returns an error or drop, the pipeline stops and returns that result.

type Pipeline []Stage

func (p Pipeline) Run(msg *Message, ctx *PipelineContext) StageResult

Run executes the pipeline on a message.

Each stage processes the message in sequence. The pipeline stops early if any stage drops the message or encounters an error.

PipelineContext carries runtime dependencies that stages need.

Stages receive this instead of importing the full runtime, which helps avoid circular dependencies and makes testing easier.

type PipelineContext struct {
Runtime RuntimeInterface // For accessing runtime methods
Ledger LedgerInterface // For storage stages
Transport TransportInterface // For transport stages
GossipQueue GossipQueueInterface // For gossip stages
Keypair KeypairInterface // For signing stages
Identity IdentityInterface // For verification stages
Personality *Personality // For filtering stages
EventBus EventBusInterface // For notification stages
}

RateLimitStage throttles incoming messages based on a key function.

type RateLimitStage struct {
Window time.Duration
Max int
KeyFunc func(msg *Message) string
}

func (s *RateLimitStage) Process(msg *Message, ctx *PipelineContext) StageResult

ReceiveBehavior defines how incoming messages are processed.

type ReceiveBehavior struct {
Verify Stage // How to verify signature (default: DefaultVerify)
Dedupe Stage // How to deduplicate (default: IDDedupe)
RateLimit Stage // Rate limiting (optional)
Filter Stage // Personality filter (optional)
Store Stage // How to store (can differ from emit!)
OnError ErrorStrategy // What to do on pipeline or handler failure
}

Runtime is the core execution environment for services.

It manages message pipelines, service lifecycle, and provides primitives like logging and transport.

type Runtime struct {
// contains filtered or unexported fields
}

func NewRuntime(cfg RuntimeConfig) *Runtime

NewRuntime creates a new runtime with the given configuration.

func (rt *Runtime) AddService(svc Service) error

AddService registers a service with the runtime.

func (rt *Runtime) Call(msg *Message, timeout time.Duration) <-chan CallResult

Call emits a message and waits for a reply.

This is the request/response primitive. The message is emitted, and the runtime tracks it so that when a response arrives (via InReplyTo), the caller gets notified.

func (rt *Runtime) Emit(msg *Message) error

Emit sends a message through the emit pipeline.

func (rt *Runtime) Env() Environment

Env returns the runtime environment.

func (rt *Runtime) Identity() IdentityInterface

Identity returns the identity interface. Runtime guarantees this is always non-nil.

func (rt *Runtime) Keypair() KeypairInterface

Keypair returns the keypair interface. Runtime guarantees this is always non-nil.

func (rt *Runtime) Log(service string) *ServiceLog

Log returns a logger scoped to the given service.

func (rt *Runtime) LookupBehavior(kind string) *Behavior

LookupBehavior looks up a behavior in this runtime’s local registry.

func (rt *Runtime) Me() *Nara

Me returns the local nara.

func (rt *Runtime) MeID() types.NaraID

MeID returns the local nara’s ID.

func (rt *Runtime) MemoryMode() string

MemoryMode returns the current memory mode (low/medium/high).

func (rt *Runtime) OnlinePeers() []*PeerInfo

OnlinePeers returns a list of currently online peers.

func (rt *Runtime) Receive(raw []byte) error

Receive processes an incoming message through the receive pipeline.

func (rt *Runtime) RegisterBehavior(b *Behavior)

RegisterBehavior registers a behavior locally for this runtime. This allows each runtime to have its own handlers, avoiding conflicts in multi-nara tests where services register handlers with their own state.

func (rt *Runtime) Start() error

Start starts all services.

func (rt *Runtime) Stop() error

Stop stops all services.

RuntimeConfig is passed to NewRuntime.

type RuntimeConfig struct {
Me *Nara
Keypair KeypairInterface
Ledger LedgerInterface // Optional (nil for stash-only)
Transport TransportInterface // Required
EventBus EventBusInterface // Optional
GossipQueue GossipQueueInterface // Optional (nil for stash-only)
Identity IdentityInterface // Optional (for public key lookups)
Personality *Personality // Optional
NetworkInfo NetworkInfoInterface // Optional (for peer/memory info)
Logger LoggerInterface // Optional (defaults to simple logger)
Environment Environment // Default: EnvProduction
}

RuntimeInterface is what services and stages can access.

This interface prevents circular dependencies and makes it easy to create mocks for testing.

type RuntimeInterface interface {
// Identity
Me() *Nara
MeID() types.NaraID
// Messaging
Emit(msg *Message) error
// Logging (runtime primitive, not a service)
Log(service string) *ServiceLog
// Environment
Env() Environment
// Network information (for automatic confidant selection)
OnlinePeers() []*PeerInfo
MemoryMode() string
// Object access - callers use these directly instead of pass-through methods.
// Runtime guarantees these are always non-nil.
Keypair() KeypairInterface
Identity() IdentityInterface
// Request/response (Call emits and waits for a reply)
Call(msg *Message, timeout time.Duration) <-chan CallResult
// Behavior registration (for services to register their message handlers)
RegisterBehavior(b *Behavior)
}

SelfAttestingVerifyStage uses public key embedded in the payload.

Used for identity announcements where the public key is in the payload itself.

type SelfAttestingVerifyStage struct {
ExtractKey func(payload any) []byte
}

func (s *SelfAttestingVerifyStage) Process(msg *Message, ctx *PipelineContext) StageResult

Service is what all services implement.

Services register with the runtime and get lifecycle callbacks. Services should embed ServiceBase to get RT and Log automatically.

type Service interface {
// Identity
Name() string
// Lifecycle
// Init is called after ServiceBase is populated. Use s.RT and s.Log directly.
Init() error
Start() error
Stop() error
}

ServiceBase provides common fields that all services need.

Embed this in service structs to get RT and Log automatically:

type MyService struct {
runtime.ServiceBase
// service-specific fields...
}
func (s *MyService) Init() error {
// RT and Log are already set by the runtime!
s.Log.Info("initializing...")
s.keypair = s.RT.Keypair()
return nil
}

The runtime automatically populates RT and Log before calling Init().

type ServiceBase struct {
RT RuntimeInterface
Log *ServiceLog
}

func (b *ServiceBase) SetBase(rt RuntimeInterface, log *ServiceLog)

SetBase is called by the runtime to populate RT and Log. Services should NOT call this directly.

ServiceBaseAccessor is implemented by services that embed ServiceBase. The runtime uses this to auto-populate RT and Log before Init().

type ServiceBaseAccessor interface {
SetBase(rt RuntimeInterface, log *ServiceLog)
}

ServiceLog is a logger scoped to a specific service.

Services get this from rt.Log(“service_name”).

type ServiceLog struct {
// contains filtered or unexported fields
}

func (l *ServiceLog) Debug(format string, args ...any)

Logger methods forward to the logger with service name prefix

func (l *ServiceLog) Error(format string, args ...any)

func (l *ServiceLog) Info(format string, args ...any)

func (l *ServiceLog) Warn(format string, args ...any)

Stage processes a message and returns an explicit result.

Stages are the building blocks of message pipelines. They can:

  • Transform the message (signing, ID assignment)
  • Store it (ledger, gossip queue)
  • Filter it (deduplication, rate limiting, personality)
  • Transport it (MQTT, mesh)
  • Drop it with a reason
  • Fail with an error
type Stage interface {
Process(msg *Message, ctx *PipelineContext) StageResult
}

func Casual(filterFunc func(*Message, *Personality) bool) Stage

Casual returns a filter for casual importance messages (level 1) with a custom filter function.

func ContentKey(keyFunc func(any) string) Stage

ContentKey returns a stage that computes semantic identity for dedup.

func ContentKeyDedupe() Stage

ContentKeyDedupe returns a stage that deduplicates by ContentKey.

func ContentKeyStore(priority int) Stage

ContentKeyStore returns a store stage with ContentKey-based deduplication.

func Critical() Stage

Critical returns a filter that never drops messages (importance level 3).

func CustomVerify(verifyFunc func(*Message, *PipelineContext) StageResult) Stage

CustomVerify returns a verification stage with a custom verification function.

func DefaultSign() Stage

DefaultSign returns the default signing stage.

func DefaultStore(priority int) Stage

DefaultStore returns a store stage with the given GC priority.

Priority values:

  • 0: Never prune (checkpoints, critical observations)
  • 1: Important (hey-there, chau)
  • 2: Normal (social events)
  • 3: Low priority (seen events)
  • 4: Expendable (pings)

func DefaultVerify() Stage

DefaultVerify returns the default signature verification stage.

func Gossip() Stage

Gossip returns a stage that adds the message to the gossip queue.

func IDDedupe() Stage

IDDedupe returns a stage that deduplicates by message ID.

func MQTT(topic string) Stage

MQTT returns a stage that broadcasts to a fixed MQTT topic.

func MQTTPerNara(pattern string) Stage

MQTTPerNara returns a stage that broadcasts to a per-nara topic.

func MeshOnly() Stage

MeshOnly returns a stage that sends directly via mesh to ToID.

func NoContentKey() Stage

NoContentKey returns a no-op content key stage.

func NoFilter() Stage

NoFilter returns a no-op filter stage.

func NoGossip() Stage

NoGossip returns a no-op gossip stage.

func NoSign() Stage

NoSign returns a no-op signing stage.

func NoStore() Stage

NoStore returns a no-op store stage (ephemeral messages).

func NoTransport() Stage

NoTransport returns a no-op transport stage (local-only messages).

func NoVerify() Stage

NoVerify returns a no-op verification stage.

func Normal() Stage

Normal returns a filter for normal importance messages (level 2).

func RateLimit(window time.Duration, max int, keyFunc func(*Message) string) Stage

RateLimit returns a rate limiting stage.

func SelfAttesting(extractKey func(any) []byte) Stage

SelfAttesting returns a verification stage that extracts the public key from the payload.

StageResult represents the outcome of a pipeline stage.

Every stage returns an explicit result - no silent failures.

type StageResult struct {
Message *Message // The message to continue with (nil = dropped)
Error error // Set if stage failed (transport error, validation failure, etc.)
Reason string // Human-readable reason for drop ("rate_limited", "duplicate", "invalid_signature")
}

func Continue(msg *Message) StageResult

Continue indicates the message should proceed to the next stage.

func Drop(reason string) StageResult

Drop indicates the message was intentionally filtered/rejected. Use this for deduplication, rate limiting, personality filtering, etc.

func Fail(err error) StageResult

Fail indicates something went wrong (transport failure, crypto error, etc.).

func (r StageResult) IsContinue() bool

IsContinue returns true if the result indicates continuation.

func (r StageResult) IsDrop() bool

IsDrop returns true if the result indicates an intentional drop.

func (r StageResult) IsError() bool

IsError returns true if the result indicates an error.

TransportInterface is what transport stages use.

type TransportInterface interface {
PublishMQTT(topic string, data []byte) error
TrySendDirect(targetID types.NaraID, msg *Message) error
}

Generated by gomarkdoc