Kafka

The Kafka driver is supported since RoadRunner version 2.11.0. The Kafka driver has been reworked in v2023.1.0. Apache Kafka is a distributed streaming system used for event stream processing, real-time data pipelines, and large-scale stream processing. Originally developed and open-sourced at LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a full-fledged event streaming platform. Now used by 80% of the Fortune 500, Kafka brings numerous benefits to virtually every industry and opens up countless new use cases, large and small.

Version 2023.2.0 update:

  • Added new SCRAM-SHA-256 and SCRAM-SHA-512 authentication mechanisms.

Configuration

.rr.yaml
# Kafka jobs driver
#
# This option is required to use the Kafka driver. Addrs can contain any number of addresses separated by commas (127.0.0.1:9092,127.0.0.1:9093,...)
# Kafka jobs driver
#
# This option is required to use the Kafka driver.
kafka:

  # Kafka broker addresses
  #
  # Required to use the Kafka driver
  brokers: [ "127.0.0.1:9092", "127.0.0.1:9002" ]

  # Ping to test the connection to Kafka
  #
  # Examples: "2s", "5m"
  # Optional, default: "10s"
  ping:
    timeout: "10s"

  # SSL/TLS configuration
  #
  # Optional, default: empty
  tls:
    # Secure connection timeout
    #
    # Examples: "2s", "5m"
    # Optional, default: "10s"
    timeout: "10s"
    
    # Path to the key file
    #
    # This option is required
    key: ""

    # Path to the certificate
    #
    # This option is required
    cert: ""

    # Path to the CA certificate, which defines the set of root certificate authorities that servers use, if required, to verify a client certificate. Used with the `client_auth_type` option.
    #
    # This option is optional
    root_ca: ""

    # Client auth type.
    #
    # This option is optional. Default value: no_client_certs. Possible values: request_client_cert, require_any_client_cert, verify_client_cert_if_given, require_and_verify_client_cert, no_client_certs
    client_auth_type: no_client_certs

  # SASL authentication options to use for all connections. Depending on the auth type, plain or aws_msk_plain sections might be removed.
  #
  # Optional, default: empty
  sasl:

    # ----------- 1. PLAIN and SCRAM auth section ---------------

    # Mechanism used for authentication
    #
    # Required for this section. Can be: 'aws_msk_iam', 'plain', 'SCRAM-SHA-256', 'SCRAM-SHA-512'
    mechanism: plain

    # Username to use for authentication.
    #
    # Required for the plain auth mechanism.
    username: foo

    # Password to use for authentication.
    #
    # Required for the plain auth mechanism.
    password: bar

    # Nonce.
    #
    # Optional for the SHA auth types. Empty by default.
    nonce: "foo"

    # If true, suffixes the "tokenauth=true" extra attribute to the initial authentication message.
    # Set this to true if the user and pass are from a delegation token.
    # Optional for the SHA auth types. Empty by default.
    is_token: false

    # Zid is an optional authorization ID to use for authentication.
    #
    # Optional, default: empty.
    zid: "foo"

    # -------------- 2. AWS_MSK_IAM auth section ------------------

    # AWS access key ID.
    #
    # Required
    access_key: foo

    # AWS secret access key.
    #
    #
    secret_key: bar

    # SessionToken, if non-empty, is a session/security token to use for authentication.
    # See the following link for more details:
    #
    # https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html
    session_token: bar

    # UserAgent is the user agent for the client to use when connecting
    # to Kafka, overriding the default "franz-go/<runtime.Version()>/<hostname>".
    # Setting a UserAgent allows authorizing based on the aws:UserAgent
    # condition key; see the following link for more details:
    #     https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-useragent
    user_agent: baz

jobs:
  num_pollers: 10
  pipeline_size: 100000
  pool:
    num_workers: 10
    max_jobs: 0
    allocate_timeout: 60s
    destroy_timeout: 60s

  pipelines:
    test-local-6:
      # Driver name
      #
      # This option is required
      driver: kafka

      # Driver's configuration
      #
      # Should not be empty
      config:

        # Pipeline priority
        #
        # If the job has priority set to 0, it will inherit the pipeline's priority. Default: 10.
        priority: 1


        # Auto-create topics for the consumer/producer
        #
        # Optional, default: false
        auto_create_topics_enable: false

        # Kafka producer options
        #
        # Optional; required only if Push/PushBatch is used.
        producer_options:

          # Kafka partitioning strategy
          # One of: Manual | Uniform | RoundRobin | LeastBackup | Sticky
          # Applies to records without a key; if a key is set, Kafka
          # uses key hashing for partition selection regardless of strategy.
          # Default: Uniform
          #
          # Manual: use an explicitly provided partition (e.g., message options);
          #         producing without a partition will fail.
          # Uniform: random, even distribution across partitions.
          # RoundRobin: sequentially cycle through partitions.
          # LeastBackup: choose the partition with the fewest buffered records.
          # Sticky: stick to a single partition for a short time to maximize batching.
          partitioning_strategy: Uniform

          # disable_idempotent disables idempotent produce requests, opting out of
          # Kafka server-side deduplication in the face of reissued requests due to
          # transient network problems.
          # Idempotent production is strictly a win, but does require the IDEMPOTENT_WRITE permission on CLUSTER
          # (pre-Kafka 3.0), and not all clients can have that permission.
          #
          # Optional, default: false
          disable_idempotent: false

          # required_acks sets the required acks for produced records.
          #
          # Optional, default: AllISRAcks. Possible values: NoAck, LeaderAck, AllISRAck
          required_acks: AllISRAck

          # max_message_bytes sets an upper bound on the size of a record batch, overriding the default 1,000,012 bytes.
          # This mirrors Kafka's max.message.bytes.
          #
          # Optional, default: 1000012
          max_message_bytes: 1000012

          # request_timeout sets how long Kafka brokers are allowed to respond to produce requests, overriding the default 10s.
          # If a broker exceeds this duration, it will reply with a request timeout error.
          #
          # Optional, default: 10s. Possible values: 10s, 10m.
          request_timeout: 10s

          # delivery_timeout sets a rough time of how long a record can sit around in a batch before timing out,
          # overriding the unlimited default. If idempotency is enabled (as it is by default), this option is only
          # enforced if it is safe to do so without creating invalid sequence numbers.
          #
          # Optional; default: Kafka's delivery.timeout.ms option. Possible values: 10s, 10m.
          delivery_timeout: 100s

          # transaction_timeout sets the timeout for a transaction, overriding the default 40s. It is a good idea to
          # keep this less than a group's session timeout.
          #
          # Optional, default: 40s. Possible values: 10s, 10m.
          transaction_timeout: 100s

          # compression_codec sets the compression codec to use for producing records.
          #
          # Optional; the default is chosen in the order preferred based on broker support. Possible values: gzip, snappy, lz4, zstd.
          compression_codec: gzip

        # Kafka consumer options. Needed to consume messages from the Kafka cluster.
        #
        # Optional, needed only if `consume` is used.
        consumer_options:

          # topics: adds topics to consume
          #
          # Default: empty (produces an error); possible to use regexp if `consume_regexp` is set to true.
          topics: [ "foo", "bar", "^[a-zA-Z0-9._-]+$" ]

          # consume_regexp sets the client to parse all topics passed to `topics` as regular expressions.
          # When consuming via regex, every metadata request loads *all* topics, so that all topics can be passed to
          # any regular expressions. Every topic is evaluated only once ever across all regular expressions; either it
          # permanently is known to match, or is permanently known to not match.
          #
          # Optional, default: false.
          consume_regexp: true

          # max_fetch_message_size sets the maximum number of bytes a broker will try to send during a fetch, overriding the default 50MiB.
          # Note that brokers may not obey this limit if it has records larger than this limit.
          # Also note that this client sends a fetch to each broker concurrently, meaning the client will
          # buffer up to <brokers * max bytes> in memory. This corresponds to the Java fetch.max.bytes setting.
          #
          # Optional, default: 50000
          max_fetch_message_size: 50000

          # min_fetch_message_size sets the minimum number of bytes a broker will try to send during a fetch,
          # overriding the default 1 byte. With the default of 1, data is sent as soon as it is available.
          # This corresponds to the Java fetch.min.bytes setting.
          #
          # Optional, default: 1.
          min_fetch_message_size: 1

          # consume_partitions sets partitions to consume from directly and the offsets to start consuming those partitions from.
          # This option is basically a way to explicitly consume from subsets of partitions in topics, or to consume at exact offsets.
          #
          # NOTE: This option is not compatible with group consuming and regex consuming.
          #
          # Optional, default: null
          consume_partitions:

            # Topic for consume_partitions
            #
            # At least one topic is required.
            foo:

              # Partition for the topic.
              #
              # At least one partition is required.
              0:

                # Partition offset.
                #
                # Required if all options are used. No default; error on empty.
                # Possible values: AtEnd, At, AfterMilli, AtStart, Relative, WithEpoch
                type: AtStart

                # Value for the At, AfterMilli, Relative, and WithEpoch offsets.
                #
                # Optional, default: 0.
                value: 1

          # consumer_offset sets the offset to start consuming from, or, if OffsetOutOfRange is seen while fetching,
          # to restart consuming from.
          #
          # Optional, default: AtStart
          consumer_offset:

            # Partition offset.
            #
            # Optional, default: AtStart. Possible values: AtEnd, At, AfterMilli, AtStart, Relative, WithEpoch
            type: AtStart

            # Value for the At, AfterMilli, Relative, and WithEpoch offsets.
            #
            # Optional, default: 0.
            value: 1

        # group_options sets the consumer group for the client to join and consume within.
        # This option is required if using any other group options.
        #
        # Default: empty.
        group_options:

          # group_id sets the group to consume in.
          #
          # Required if using group consumer.
          group_id: foo

          # block_rebalance_on_poll switches the client to block rebalances whenever you poll.
          #
          # Optional, default: false.
          block_rebalance_on_poll: true

Last updated

Was this helpful?