Skip to content

NATS Protocol Reference

This document describes the NATS messaging protocol used by the Server Interpreter component of falcon-instrument-hub. Message schemas are aligned with falcon-api/embedded/commands/v1/ specifications.

Overview

The Server Interpreter uses NATS for: - Receiving measurement requests from falcon - Coordinating with instrument daemons - Uploading measurement results

JetStream is used for large data transfers to ensure reliability and persistence.

Channel Naming Convention

Channels follow the pattern: {CHANNEL_NAME}.{suffix}

Examples: - LOG.interpreter - Log messages from interpreter - PROCESS_REQUEST - Measurement request channel - measurement.data.{process_id} - JetStream data channel

Message Schemas

LOG

Logging messages from components.

# falcon-api/embedded/commands/v1/log.yaml
channel: LOG
parameters:
  hash: int (optional)     # Process identifier
  message: string          # Log message text
  timestamp: int           # Unix timestamp

Go Type:

type LogMessage struct {
    Hash      int64  `json:"hash,omitempty"`
    Message   string `json:"message"`
    Timestamp int64  `json:"timestamp"`
}

PROCESS_REQUEST

Request to process a measurement.

# falcon-api/embedded/commands/v1/process_request.yaml
channel: PROCESS_REQUEST
parameters:
  process_id: int          # Unique process identifier
  request: jsonable        # MeasurementRequest object
  configurations: json     # Instrument configurations
  data_path: string        # Path to store data

Go Type:

type ProcessRequestMessage struct {
    ProcessID      int64       `json:"process_id"`
    Request        interface{} `json:"request"`
    Configurations interface{} `json:"configurations"`
    DataPath       string      `json:"data_path"`
}

MEASUREMENT_READY

Signal that measurement is ready for instrument daemon.

# falcon-api/embedded/commands/v1/measurement_ready.yaml
channel: MEASUREMENT_READY
parameters:
  timestamp: int
  getters: list[string]    # InstrumentPort JSONs
  setters: list[string]    # InstrumentPort JSONs
  requirements: list[string]
  has_set: boolean
  has_trigger: boolean
  is_buffered: boolean
  process_id: int
  chunk_id: int

Go Type:

type MeasurementReadyMessage struct {
    Timestamp    int64    `json:"timestamp"`
    Getters      []string `json:"getters"`
    Setters      []string `json:"setters"`
    Requirements []string `json:"requirements"`
    HasSet       bool     `json:"has_set"`
    HasTrigger   bool     `json:"has_trigger"`
    IsBuffered   bool     `json:"is_buffered"`
    ProcessID    int64    `json:"process_id"`
    ChunkID      int64    `json:"chunk_id"`
}

PROCESS_DATA

Data collected from instruments.

# falcon-api/embedded/commands/v1/process_data.yaml
channel: PROCESS_DATA
parameters:
  chunk_id: int
  timestamp: int
  data: string             # JSON-serialized measurement data
  process_id: int

Go Type:

type ProcessDataMessage struct {
    ChunkID   int64  `json:"chunk_id"`
    Timestamp int64  `json:"timestamp"`
    Data      string `json:"data"`
    ProcessID int64  `json:"process_id"`
}

UPLOAD_DATA

Notification of uploaded measurement results.

# falcon-api/embedded/commands/v1/upload_data.yaml
channel: UPLOAD_DATA
parameters:
  timestamp: int
  process_id: int
  unit_hash: int           # Algorithmic unit hash
  channel: string          # NATS channel for data retrieval
  stream: string           # JetStream stream name

Go Type:

type UploadDataMessage struct {
    Timestamp int64  `json:"timestamp"`
    ProcessID int64  `json:"process_id"`
    UnitHash  int64  `json:"unit_hash"`
    Channel   string `json:"channel"`
    Stream    string `json:"stream"`
}

UPDATE_DAEMON_PROPERTY

Update instrument daemon property.

# falcon-api/embedded/commands/v1/update_daemon_property.yaml
channel: UPDATE_DAEMON_PROPERTY
parameters:
  timestamp: int
  property: string         # Property name
  name: string             # InstrumentPort JSON
  value: any               # Value to set

Go Type:

type UpdateDaemonPropertyMessage struct {
    Timestamp int64       `json:"timestamp"`
    Property  string      `json:"property"`
    Name      string      `json:"name"`
    Value     interface{} `json:"value"`
}

STATUS

Daemon status heartbeat.

# falcon-api/embedded/commands/v1/status.yaml
channel: STATUS
parameters:
  timestamp: int
  status: boolean          # Active status

Go Type:

type StatusMessage struct {
    Timestamp int64 `json:"timestamp"`
    Status    bool  `json:"status"`
}

Instrument Coordination Channels

SET

Execute a set instruction on an instrument.

# falcon-api/embedded/commands/v1/set.yaml
channel: SET
parameters:
  timestamp: int
  process_id: int
  chunk_id: int
  property: string
  index: int
  value: any

GET

Execute a get instruction on an instrument.

# falcon-api/embedded/commands/v1/get.yaml
channel: GET
parameters:
  timestamp: int
  process_id: int
  chunk_id: int
  property: string
  index: int

TRIGGER

Trigger buffered instruments.

# falcon-api/embedded/commands/v1/trigger.yaml
channel: TRIGGER
parameters:
  timestamp: int
  process_id: int
  chunk_id: int
  is_setter: boolean

ARMED

Instrument armed and ready notification.

# falcon-api/embedded/commands/v1/armed.yaml
channel: ARMED
parameters:
  timestamp: int
  process_id: int
  chunk_id: int

EXECUTING

Instrument currently executing notification.

# falcon-api/embedded/commands/v1/executing.yaml
channel: EXECUTING
parameters:
  timestamp: int
  process_id: int
  chunk_id: int

RETURN_DATA

Measurement data response from instrument.

# falcon-api/embedded/commands/v1/return_data.yaml
channel: RETURN_DATA
parameters:
  timestamp: int
  process_id: int
  chunk_id: int
  data: any

RETURN_GET

Get operation response from instrument.

# falcon-api/embedded/commands/v1/return_get.yaml
channel: RETURN_GET
parameters:
  timestamp: int
  process_id: int
  chunk_id: int
  value: any

JetStream Configuration

The Server Interpreter uses JetStream for reliable data transfer:

streamConfig := &nats.StreamConfig{
    Name:      "FALCON_MEASUREMENTS",
    Subjects:  []string{"measurement.result.>", "measurement.data.>"},
    Retention: nats.LimitsPolicy,
    MaxAge:    24 * time.Hour,
    MaxMsgs:   10000,
    MaxBytes:  1024 * 1024 * 1024, // 1GB
    Storage:   nats.FileStorage,
}

Measurement Completion Notification

When the hub completes an averaged measurement, it publishes a notification to JetStream:

Subject: measurement.result.{measurement_id}

{
  "type": "measurement_complete",
  "measurement_id": "jetstream-test-001",
  "process_id": 42,
  "status": "success",
  "data_location": {
    "stream": "FALCON_MEASUREMENTS",
    "subject": "measurement.result.jetstream-test-001",
    "file_path": "/data/measurements/sweep_jetstream-test-001.json",
    "num_points": 101,
    "num_sweeps": 10
  },
  "timestamp": "2026-02-09T10:30:00Z"
}

Go Type:

type FalconMeasurementNotification struct {
    Type          string             `json:"type"`
    MeasurementID string             `json:"measurement_id"`
    ProcessID     int64              `json:"process_id"`
    Status        string             `json:"status"`
    DataLocation  FalconDataLocation `json:"data_location"`
    Timestamp     time.Time          `json:"timestamp"`
}

type FalconDataLocation struct {
    Stream    string `json:"stream"`
    Subject   string `json:"subject"`
    FilePath  string `json:"file_path"`
    NumPoints int    `json:"num_points"`
    NumSweeps int    `json:"num_sweeps"`
}

Falcon Subscription Example

Falcon can subscribe to measurement completions:

import nats

async def handle_measurement_complete(msg):
    data = json.loads(msg.data)
    if data["status"] == "success":
        file_path = data["data_location"]["file_path"]
        # Load and process measurement data

js = nc.jetstream()
await js.subscribe("measurement.result.>", cb=handle_measurement_complete)

Message Flow Example

Falcon                 Server Interpreter           Instrument Daemon
  │                           │                           │
  │──PROCESS_REQUEST────────>│                           │
  │                           │                           │
  │                           │──UPDATE_DAEMON_PROPERTY─>│
  │                           │──UPDATE_DAEMON_PROPERTY─>│
  │                           │──MEASUREMENT_READY──────>│
  │                           │                           │
  │                           │<──────────ARMED──────────│
  │                           │<────────EXECUTING────────│
  │                           │<──────RETURN_DATA────────│
  │                           │                           │
  │                           │<──────PROCESS_DATA───────│
  │                           │                           │
  │<────────UPLOAD_DATA───────│                           │
  │                           │                           │

Error Handling

Errors are communicated through: 1. LOG channel messages 2. Empty/error responses on request channels 3. NATS subscription errors

Channel Constants

In Go code, use RuntimeChannels:

serverinterpreter.RuntimeChannels.ProcessRequest  // "PROCESS_REQUEST"
serverinterpreter.RuntimeChannels.ProcessData     // "PROCESS_DATA"
serverinterpreter.RuntimeChannels.MeasurementReady // "MEASUREMENT_READY"
serverinterpreter.RuntimeChannels.UploadData      // "UPLOAD_DATA"
serverinterpreter.RuntimeChannels.Log             // "LOG"
serverinterpreter.RuntimeChannels.Status          // "STATUS"

See Also