Kafka

Kafka driver supported since RoadRunner version 2.11.0. The Kafka driver has been reworked in v2023.1.0. Apache Kafka is a distributed streaming system used for event stream processing, real-time data pipelines, and large-scale stream processing. Originally developed and open-sourced at LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a full-fledged event streaming platform. Now used by 80% of the Fortune 500, Kafka brings numerous benefits to virtually every industry and opens up countless new use cases, large and small.

Version 2023.2.0 update:

  • Added new SCRAM-SHA-256 and SCRAM-SHA-512 authentication mechanisms.

Configuration

.rr.yaml
# Kafka jobs driver
#
# This option is required to use Kafka driver. Addrs can contain any number of addresses separated by comma (127.0.0.1:9092,127.0.0.1:9093,...)
# Kafka jobs driver
#
# This option is required to use Kafka driver,
kafka:

  # Kafka brokers addresses
  #
  # Required to use Kafka driver
  brokers: [ "127.0.0.1:9092", "127.0.0.1:9002" ]

  # Ping to test connection to Kafka
  #
  # Examples: "2s", "5m"
  # Optional, default: "10s"
  ping:
    timeout: "10s"

  # SSL/TLS configuration
  #
  # Optional, default: empty
  tls:
    # Secure connection timeout
    #
    # Examples: "2s", "5m"
    # Optional, default: "10s"
    timeout: "10s"
    
    # Path to the key file
    #
    # This option is required
    key: ""

    # Path to the certificate
    #
    # This option is required
    cert: ""

    # Path to the CA certificate, defines the set of root certificate authorities that servers use if required to verify a client certificate. Used with the `client_auth_type` option.
    #
    # This option is optional
    root_ca: ""

    # Client auth type.
    #
    # This option is optional. Default value: no_client_certs. Possible values: request_client_cert, require_any_client_cert, verify_client_cert_if_given, require_and_verify_client_cert, no_client_certs
    client_auth_type: no_client_certs

  # SASL authentication options to use for all connections. Depending on the auth type, plain or aws_msk_plain sections might be removed.
  #
  # Optional, default: empty
  sasl:

    # ----------- 1. PLAIN and SCRAM auth section ---------------

    # Mechanism used for the authentication
    #
    # Required for the section. Might be: 'aws_msk_iam', 'plain', 'SCRAM-SHA-256', 'SCRAM-SHA-512'
    mechanism: plain

    # Username to use for authentication.
    #
    # Required for the plain auth mechanism.
    username: foo

    # Password to use for authentication.
    #
    # Required for the plain auth mechanism.
    password: bar

    # Nonce.
    #
    # Optional for the SHA auth types. Empty by default.
    nonce: "foo"

    # If true, suffixes the "tokenauth=true" extra attribute to the initial authentication message.
    # Set this to true if the user and pass are from a delegation token.
    # Optional for the SHA auth types. Empty by default.
    is_token: false

    # Zid is an optional authorization ID to use in authenticating.
    #
    # Optional, default: empty.
    zid: "foo"

    # -------------- 2. AWS_MSK_IAM auth section ------------------

    # AWS Access key ID.
    #
    # Required
    access_key: foo

    # AWS Secret Access Key.
    #
    #
    secret_key: bar

    # SessionToken, if non-empty, is a session / security token to use for authentication.
    # See the following link for more details:
    #
    # https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html
    session_token: bar

    # UserAgent is the user agent to for the client to use when connecting
    # to Kafka, overriding the default "franz-go/<runtime.Version()>/<hostname>".
    # Setting a UserAgent allows authorizing based on the aws:UserAgent
    # condition key; see the following link for more details:
    #     https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-useragent
    user_agent: baz

jobs:
  num_pollers: 10
  pipeline_size: 100000
  pool:
    num_workers: 10
    max_jobs: 0
    allocate_timeout: 60s
    destroy_timeout: 60s

  pipelines:
    test-local-6:
      # Driver name
      #
      # This option is required
      driver: kafka

      # Driver's configuration
      #
      # Should not be empty
      config:

        # Pipeline priority
        #
        # If the job has priority set to 0, it will inherit the pipeline's priority. Default: 10.
        priority: 1


        # Auto create topic for the consumer/producer
        #
        # Optional, default: false
        auto_create_topics_enable: false

        # Kafka producer options
        #
        # Optional, required only if Push/PushBatch is used.
        producer_options:

          # disable_idempotent disables idempotent produce requests, opting out of
          # Kafka server-side deduplication in the face of reissued requests due to
          # transient network problems.
          # Idempotent production is strictly a win, but does require the IDEMPOTENT_WRITE permission on CLUSTER
          # (pre Kafka 3.0), and not all clients can have that permission.
          #
          # Optional, defaut: false
          disable_idempotent: false

          # required_acks sets the required acks for produced records.
          #
          # Optional, default: AllISRAcks. Possible values: NoAck, LeaderAck, AllISRAck
          required_acks: AllISRAck

          # max_message_bytes upper bounds the size of a record batch, overriding the default 1,000,012 bytes.
          # This mirrors Kafka's max.message.bytes.
          #
          # Optional, default: 1000012
          max_message_bytes: 1000012

          # request_timeout sets how long Kafka broker's are allowed to respond produce requests, overriding the default 10s.
          # If a broker exceeds this duration, it will reply with a request timeout error.
          #
          # Optional, default: 10s. Possible values: 10s, 10m.
          request_timeout: 10s

          # delivery_timeout sets a rough time of how long a record can sit around in a batch before timing out,
          # overriding the unlimited default. If idempotency is enabled (as it is by default), this option is only
          # enforced if it is safe to do so without creating invalid sequence numbers.
          #
          # Optional, default: delivery.timeout.ms Kafka option. Possible values: 10s, 10m.
          delivery_timeout: 100s

          # transaction_timeout sets the allowed for a transaction, overriding the default 40s. It is a good idea to
          # keep this less than a group's session timeout.
          #
          # Optional, default 40s. Possible values: 10s, 10m.
          transaction_timeout: 100

          # compression_codec sets the compression codec to use for producing records.
          #
          # Optional, default is chosen in the order preferred based on broker support. Possible values: gzip, snappy, lz4, zstd.
          compression_codec: gzip

        # Kafka Consumer options. Needed to consume messages from the Kafka cluster.
        #
        # Optional, needed only if `consume` is used.
        consumer_options:

          # topics: adds topics to use for consuming
          #
          # Default: empty (will produce an error), possible to use regexp if `consume_regexp` is set to true.
          topics: [ "foo", "bar", "^[a-zA-Z0-9._-]+$" ]

          # consume_regexp sets the client to parse all topics passed to `topics` as regular expressions.
          # When consuming via regex, every metadata request loads *all* topics, so that all topics can be passed to
          # any regular expressions. Every topic is evaluated only once ever across all regular expressions; either it
          # permanently is known to match, or is permanently known to not match.
          #
          # Optional, default: false.
          consume_regexp: true

          # max_fetch_message_size sets the maximum amount of bytes a broker will try to send during a fetch, overriding the default 50MiB.
          # Note that brokers may not obey this limit if it has records larger than this limit.
          # Also note that this client sends a fetch to each broker concurrently, meaning the client will
          # buffer up to <brokers * max bytes> worth of memory. This corresponds to the Java fetch.max.bytes setting.
          #
          # Optional, default 50000
          max_fetch_message_size: 50000

          # min_fetch_message_size sets the minimum amount of bytes a broker will try to send during a fetch,
          # overriding the default 1 byte. With the default of 1, data is sent as soon as it is available.
          # This corresponds to the Java fetch.min.bytes setting.
          #
          # Optional, default: 1.
          min_fetch_message_size: 1

          # consume_partitions sets partitions to consume from directly and the offsets to start consuming those partitions from.
          # This option is basically a way to explicitly consume from subsets of partitions in topics, or to consume at exact offsets.
          #
          # NOTE: This option is not compatible with group consuming and regex consuming.
          #
          # Optional, default: null
          consume_partitions:

            # Topic for the consume_partitions
            #
            # Required at least one topic.
            foo:

              # Partition for the topic.
              #
              # Required at least one partition.
              0:

                # Partition offset.
                #
                # Required if all options is used. No default, error on empty.
                # Possible values: AtEnd, At, AfterMilli, AtStart, Relative, WithEpoch
                type: AtStart

                # Value for the: At, AfterMilli, Relative and WithEpoch offsets.
                #
                # Optional, default: 0.
                value: 1

          # consumer_offset sets the offset to start consuming from, or if OffsetOutOfRange is seen while fetching,
          # to restart consuming from.
          #
          # Optional, default: AtStart
          consumer_offset:

            # Partition offset.
            #
            # Optional, default: AtStart. Possible values: AtEnd, At, AfterMilli, AtStart, Relative, WithEpoch
            type: AtStart

            # Value for the: At, AfterMilli, Relative and WithEpoch offsets.
            #
            # Optional, default: 0.
            value: 1

        # group_options sets the consumer group for the client to join and consume in.
        # This option is required if using any other group options.
        #
        # Default: empty.
        group_options:

          # group_id sets the group to consume.
          #
          # Required if using group consumer.
          group_id: foo

          # block_rebalance_on_poll switches the client to block rebalances whenever you poll.
          #
          # Optional, default: false.
          block_rebalance_on_poll: true

Last updated