RoadRunner
  • 🟠General
    • What is RoadRunner?
    • Features
    • Quick Start
    • Installation
    • Configuration
    • Contributing
    • Upgrade and Compatibility
  • 👷PHP Worker
    • Worker
    • Workers pool
    • Developer mode
    • Code Coverage
    • Debugging
    • Environment
    • Manual workers scaling
    • Auto workers scaling
    • RPC
  • 🟢Customization
    • Building RR with a custom plugin
    • Integrating with Golang Apps
    • Writing a Middleware
    • Writing a Jobs Driver
    • Writing a Plugin
    • Events Bus
  • 🔌Plugins
    • Intro into Plugins
    • Centrifuge (WebSockets)
    • Service (Systemd)
    • Configuration
    • Server
    • Locks
    • gRPC
    • TCP
  • 🌐Community Plugins
    • Intro into Community Plugins
    • Circuit Breaker
    • SendRemoteFile
    • RFC 7234 Cache
  • 🔵App Server
    • Production Usage
    • RoadRunner with NGINX
    • RR as AWS Lambda
    • Docker Images
    • CLI Commands
    • Systemd
  • 🔐Key-Value
    • Intro into KV
    • Memcached
    • In-Memory
    • BoltDB
    • Redis
  • 📦Queues and Jobs
    • Intro into Jobs
    • Google Pub/Sub
    • Beanstalk
    • In-Memory
    • RabbitMQ
    • BoltDB
    • Kafka
    • NATS
    • SQS
  • 🕸️HTTP
    • Intro into HTTP
    • Headers and CORS
    • Proxy IP parser
    • Static files
    • X-Sendfile
    • Streaming
    • gzip
  • 📈Logging and Observability
    • OpenTelemetry
    • HealthChecks
    • Access Logs
    • AppLogger
    • Metrics
    • Grafana
    • Logger
  • 🔀Workflow Engine
    • Temporal.io
    • Worker
  • 🧩Integrations
    • Migration from RRv1 to RRv2
    • Spiral Framework
    • Yii
    • Symfony
    • Laravel
    • ChubbyPHP
  • 🧪Experimental Features
    • List of the Experimental Features
  • 🚨Error codes
    • CRC validation failed
    • Allocate Timeout
  • 📚Releases
    • v2025.1.1
    • v2025.1.0
    • v2024.3.5
    • v2024.3.4
    • v2024.3.3
    • v2024.3.2
    • v2024.3.1
    • v2024.3.0
Powered by GitBook
On this page

Was this helpful?

Edit on GitHub
  1. Queues and Jobs

Kafka

Kafka driver supported since RoadRunner version 2.11.0. The Kafka driver has been reworked in v2023.1.0. Apache Kafka is a distributed streaming system used for event stream processing, real-time data pipelines, and large-scale stream processing. Originally developed and open-sourced at LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a full-fledged event streaming platform. Now used by 80% of the Fortune 500, Kafka brings numerous benefits to virtually every industry and opens up countless new use cases, large and small.

Version 2023.2.0 update:

  • Added new SCRAM-SHA-256 and SCRAM-SHA-512 authentication mechanisms.

Configuration

.rr.yaml
# Kafka jobs driver
#
# This option is required to use Kafka driver. Addrs can contain any number of addresses separated by comma (127.0.0.1:9092,127.0.0.1:9093,...)
# Kafka jobs driver
#
# This option is required to use Kafka driver,
kafka:

  # Kafka brokers addresses
  #
  # Required to use Kafka driver
  brokers: [ "127.0.0.1:9092", "127.0.0.1:9002" ]

  # Ping to test connection to Kafka
  #
  # Examples: "2s", "5m"
  # Optional, default: "10s"
  ping:
    timeout: "10s"

  # SSL/TLS configuration
  #
  # Optional, default: empty
  tls:
    # Secure connection timeout
    #
    # Examples: "2s", "5m"
    # Optional, default: "10s"
    timeout: "10s"
    
    # Path to the key file
    #
    # This option is required
    key: ""

    # Path to the certificate
    #
    # This option is required
    cert: ""

    # Path to the CA certificate, defines the set of root certificate authorities that servers use if required to verify a client certificate. Used with the `client_auth_type` option.
    #
    # This option is optional
    root_ca: ""

    # Client auth type.
    #
    # This option is optional. Default value: no_client_certs. Possible values: request_client_cert, require_any_client_cert, verify_client_cert_if_given, require_and_verify_client_cert, no_client_certs
    client_auth_type: no_client_certs

  # SASL authentication options to use for all connections. Depending on the auth type, plain or aws_msk_plain sections might be removed.
  #
  # Optional, default: empty
  sasl:

    # ----------- 1. PLAIN and SCRAM auth section ---------------

    # Mechanism used for the authentication
    #
    # Required for the section. Might be: 'aws_msk_iam', 'plain', 'SCRAM-SHA-256', 'SCRAM-SHA-512'
    mechanism: plain

    # Username to use for authentication.
    #
    # Required for the plain auth mechanism.
    username: foo

    # Password to use for authentication.
    #
    # Required for the plain auth mechanism.
    password: bar

    # Nonce.
    #
    # Optional for the SHA auth types. Empty by default.
    nonce: "foo"

    # If true, suffixes the "tokenauth=true" extra attribute to the initial authentication message.
    # Set this to true if the user and pass are from a delegation token.
    # Optional for the SHA auth types. Empty by default.
    is_token: false

    # Zid is an optional authorization ID to use in authenticating.
    #
    # Optional, default: empty.
    zid: "foo"

    # -------------- 2. AWS_MSK_IAM auth section ------------------

    # AWS Access key ID.
    #
    # Required
    access_key: foo

    # AWS Secret Access Key.
    #
    #
    secret_key: bar

    # SessionToken, if non-empty, is a session / security token to use for authentication.
    # See the following link for more details:
    #
    # https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html
    session_token: bar

    # UserAgent is the user agent to for the client to use when connecting
    # to Kafka, overriding the default "franz-go/<runtime.Version()>/<hostname>".
    # Setting a UserAgent allows authorizing based on the aws:UserAgent
    # condition key; see the following link for more details:
    #     https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-useragent
    user_agent: baz

jobs:
  num_pollers: 10
  pipeline_size: 100000
  pool:
    num_workers: 10
    max_jobs: 0
    allocate_timeout: 60s
    destroy_timeout: 60s

  pipelines:
    test-local-6:
      # Driver name
      #
      # This option is required
      driver: kafka

      # Driver's configuration
      #
      # Should not be empty
      config:

        # Pipeline priority
        #
        # If the job has priority set to 0, it will inherit the pipeline's priority. Default: 10.
        priority: 1


        # Auto create topic for the consumer/producer
        #
        # Optional, default: false
        auto_create_topics_enable: false

        # Kafka producer options
        #
        # Optional, required only if Push/PushBatch is used.
        producer_options:

          # disable_idempotent disables idempotent produce requests, opting out of
          # Kafka server-side deduplication in the face of reissued requests due to
          # transient network problems.
          # Idempotent production is strictly a win, but does require the IDEMPOTENT_WRITE permission on CLUSTER
          # (pre Kafka 3.0), and not all clients can have that permission.
          #
          # Optional, defaut: false
          disable_idempotent: false

          # required_acks sets the required acks for produced records.
          #
          # Optional, default: AllISRAcks. Possible values: NoAck, LeaderAck, AllISRAck
          required_acks: AllISRAck

          # max_message_bytes upper bounds the size of a record batch, overriding the default 1,000,012 bytes.
          # This mirrors Kafka's max.message.bytes.
          #
          # Optional, default: 1000012
          max_message_bytes: 1000012

          # request_timeout sets how long Kafka broker's are allowed to respond produce requests, overriding the default 10s.
          # If a broker exceeds this duration, it will reply with a request timeout error.
          #
          # Optional, default: 10s. Possible values: 10s, 10m.
          request_timeout: 10s

          # delivery_timeout sets a rough time of how long a record can sit around in a batch before timing out,
          # overriding the unlimited default. If idempotency is enabled (as it is by default), this option is only
          # enforced if it is safe to do so without creating invalid sequence numbers.
          #
          # Optional, default: delivery.timeout.ms Kafka option. Possible values: 10s, 10m.
          delivery_timeout: 100s

          # transaction_timeout sets the allowed for a transaction, overriding the default 40s. It is a good idea to
          # keep this less than a group's session timeout.
          #
          # Optional, default 40s. Possible values: 10s, 10m.
          transaction_timeout: 100

          # compression_codec sets the compression codec to use for producing records.
          #
          # Optional, default is chosen in the order preferred based on broker support. Possible values: gzip, snappy, lz4, zstd.
          compression_codec: gzip

        # Kafka Consumer options. Needed to consume messages from the Kafka cluster.
        #
        # Optional, needed only if `consume` is used.
        consumer_options:

          # topics: adds topics to use for consuming
          #
          # Default: empty (will produce an error), possible to use regexp if `consume_regexp` is set to true.
          topics: [ "foo", "bar", "^[a-zA-Z0-9._-]+$" ]

          # consume_regexp sets the client to parse all topics passed to `topics` as regular expressions.
          # When consuming via regex, every metadata request loads *all* topics, so that all topics can be passed to
          # any regular expressions. Every topic is evaluated only once ever across all regular expressions; either it
          # permanently is known to match, or is permanently known to not match.
          #
          # Optional, default: false.
          consume_regexp: true

          # max_fetch_message_size sets the maximum amount of bytes a broker will try to send during a fetch, overriding the default 50MiB.
          # Note that brokers may not obey this limit if it has records larger than this limit.
          # Also note that this client sends a fetch to each broker concurrently, meaning the client will
          # buffer up to <brokers * max bytes> worth of memory. This corresponds to the Java fetch.max.bytes setting.
          #
          # Optional, default 50000
          max_fetch_message_size: 50000

          # min_fetch_message_size sets the minimum amount of bytes a broker will try to send during a fetch,
          # overriding the default 1 byte. With the default of 1, data is sent as soon as it is available.
          # This corresponds to the Java fetch.min.bytes setting.
          #
          # Optional, default: 1.
          min_fetch_message_size: 1

          # consume_partitions sets partitions to consume from directly and the offsets to start consuming those partitions from.
          # This option is basically a way to explicitly consume from subsets of partitions in topics, or to consume at exact offsets.
          #
          # NOTE: This option is not compatible with group consuming and regex consuming.
          #
          # Optional, default: null
          consume_partitions:

            # Topic for the consume_partitions
            #
            # Required at least one topic.
            foo:

              # Partition for the topic.
              #
              # Required at least one partition.
              0:

                # Partition offset.
                #
                # Required if all options is used. No default, error on empty.
                # Possible values: AtEnd, At, AfterMilli, AtStart, Relative, WithEpoch
                type: AtStart

                # Value for the: At, AfterMilli, Relative and WithEpoch offsets.
                #
                # Optional, default: 0.
                value: 1

          # consumer_offset sets the offset to start consuming from, or if OffsetOutOfRange is seen while fetching,
          # to restart consuming from.
          #
          # Optional, default: AtStart
          consumer_offset:

            # Partition offset.
            #
            # Optional, default: AtStart. Possible values: AtEnd, At, AfterMilli, AtStart, Relative, WithEpoch
            type: AtStart

            # Value for the: At, AfterMilli, Relative and WithEpoch offsets.
            #
            # Optional, default: 0.
            value: 1

        # group_options sets the consumer group for the client to join and consume in.
        # This option is required if using any other group options.
        #
        # Default: empty.
        group_options:

          # group_id sets the group to consume.
          #
          # Required if using group consumer.
          group_id: foo

          # block_rebalance_on_poll switches the client to block rebalances whenever you poll.
          #
          # Optional, default: false.
          block_rebalance_on_poll: true
PreviousBoltDBNextNATS

Last updated 1 year ago

Was this helpful?

📦