Example: NATS event sink¶
Apply separately
config/samples/kollect_v1alpha1_kollecteventsink_nats.yaml is not in the default kustomization.
Apply it explicitly after creating a NATS JetStream endpoint and credentials Secret.
config/samples/kollect_v1alpha1_kollecteventsink_nats.yaml — not in default kustomization.
Event + state pairing
Event emitters complement relational sinks — pair NATS with Postgres via databaseSinkRefs and
eventSinkRefs when portals need queryable state and downstream systems need change notifications.
Sample manifest¶
apiVersion: kollect.dev/v1alpha1
kind: KollectEventSink
metadata:
name: nats-inventory-demo
namespace: default
spec:
type: nats
cluster: prod-west
connectionTest: false
secretRef:
name: nats-credentials
nats:
url: nats://nats.kollect-system.svc:4222
subject: inventory.events
stream: kollect_events
| Field | Purpose |
|---|---|
spec.nats.url |
NATS server (nats://host:4222); falls back to spec.endpoint |
spec.nats.subject |
JetStream publish subject (required) |
spec.nats.stream |
Stream name (default kollect_events; dots sanitized to _) |
spec.secretRef |
Optional token or username/password keys |
spec.cluster |
Cluster label embedded in each event envelope |
Event emitter role (ADR-0401). Pair with Postgres via
databaseSinkRefs + eventSinkRefs on the same KollectInventory.
Message contract¶
Each export publishes one JSON envelope to JetStream:
{
"schemaVersion": "kollect.dev/v1alpha1",
"timestamp": "2026-01-15T12:00:00Z",
"cluster": "prod-west",
"namespace": "team-a",
"payload": [/* canonical Item rows */]
}
Delivery is at-least-once. The backend sets Nats-Msg-Id to a content hash of
{cluster}/{namespace}/{payload} so JetStream deduplicates identical replays within the
stream's duplicate-detection window.
Golden fixture: test/schema/golden/nats-event-envelope.json.
Consumer walkthrough¶
- Ensure a JetStream-enabled NATS server is reachable from the operator.
- Create a Secret with credentials when auth is required:
apiVersion: v1
kind: Secret
metadata:
name: nats-credentials
namespace: default
stringData:
token: "<nats-token>"
- Apply the
KollectEventSinksample, then reference it from aKollectInventory:
spec:
eventSinkRefs:
- nats-inventory-demo
- Consume with the NATS CLI (replace stream/subject names to match your spec):
nats stream add kollect_events --subjects "inventory.events" --storage file --retention limits
nats consumer add kollect_events inventory-reader --filter inventory.events --ack explicit
nats consumer next kollect_events inventory-reader
Or with nats.go:
js, _ := jetstream.New(nc)
cons, _ := js.CreateOrUpdateConsumer(ctx, "kollect_events", jetstream.ConsumerConfig{
FilterSubject: "inventory.events",
AckPolicy: jetstream.AckExplicitPolicy,
})
cons.Consume(func(msg jetstream.Msg) {
// msg.Data() is the EventEnvelope JSON
_ = msg.Ack()
})
Kafka alternative: Kafka event sink ·
config/samples/kollect_v1alpha1_kollecteventsink_kafka.yaml.