NATS

NATS driver supported in RR since v2.5.0 and includes only NATS JetStream support.

Configuration

.rr.yaml
version: "3"

nats:
  addr: "demo.nats.io"

jobs:
  num_pollers: 10
  pipeline_size: 100000
  pool:
    num_workers: 10
    max_jobs: 0
    allocate_timeout: 60s
    destroy_timeout: 60s

  pipelines:
    test-1:
      driver: nats
      config:
        # Pipeline priority
        # If the job has priority set to 0, it will inherit the pipeline's priority. Default: 10.
        priority: 2

        # NATS prefetch
        # Messages to read into the channel
        prefetch: 100

        # NATS subject
        # Default: default
        subject: default

        # NATS stream
        # Default: default-stream
        stream: foo

        # The consumer will only start receiving messages that were created after the consumer was created
        # Default: false (deliver all messages from the stream beginning)
        deliver_new: true

        # Consumer rate-limiter in bytes https://docs.nats.io/jetstream/concepts/consumers#ratelimit
        # Default: 1000
        rate_limit: 100

        # Delete the stream when after pipeline was stopped
        # Default: false
        delete_stream_on_stop: false

        # Delete message from the stream after successful acknowledge
        # Default: false
        delete_after_ack: false

Configuration options

Here is a detailed description of each of the nats-specific options:

Subject

subject - nats subject.

Stream

stream - stream name.

To prevent duplicate message consumption, ensure that each pipeline is configured with a unique NATS stream. Using the same stream for multiple pipelines will result in the same message being processed multiple times.

Deliver new

deliver_new - the consumer will only start receiving messages that were created after the consumer was created.

Rate limit

rate_limit - NATS rate limiter.

Delete stream on stop

delete_stream_on_stop - delete the whole stream when pipeline stopped.

Delete after ack

delete_after_ack - delete message after it successfully acknowledged.

Last updated