NAME
    EV::Kafka - High-performance asynchronous Kafka/Redpanda client using EV

SYNOPSIS
        use EV::Kafka;

        my $kafka = EV::Kafka->new(
            brokers  => '127.0.0.1:9092',
            acks     => -1,
            on_error => sub { warn "kafka: @_" },
            on_message => sub {
                my ($topic, $partition, $offset, $key, $value, $headers) = @_;
                print "$topic:$partition @ $offset  $key = $value\n";
            },
        );

        # Producer
        $kafka->connect(sub {
            $kafka->produce('my-topic', 'key', 'value', sub {
                my ($result, $err) = @_;
                my $off = $result->{topics}[0]{partitions}[0]{base_offset};
                print "produced at offset $off\n";
            });
        });

        # Consumer (manual assignment)
        $kafka->assign([{ topic => 'my-topic', partition => 0, offset => 0 }]);
        my $poll = EV::timer 0, 0.1, sub { $kafka->poll };

        # Consumer group
        $kafka->subscribe('my-topic',
            group_id  => 'my-group',
            on_assign => sub { ... },
            on_revoke => sub { ... },
        );

        EV::run;

DESCRIPTION
    EV::Kafka is a high-performance asynchronous Kafka client that
    implements the Kafka binary protocol in XS with EV event loop
    integration. It targets Redpanda and Apache Kafka (protocol version
    0.11+).

    Two-layer architecture:

    *   EV::Kafka::Conn (XS) -- single broker TCP connection with protocol
        encoding/decoding, correlation ID matching, pipelining, optional TLS
        and SASL (PLAIN, SCRAM-SHA-256/512) authentication.

    *   EV::Kafka::Client (Perl) -- cluster management with metadata
        discovery, broker connection pooling, partition leader routing,
        producer with key-based partitioning, consumer with manual
        assignment or consumer groups.

    Features:

    *   Binary protocol implemented in pure XS (no librdkafka dependency)

    *   Automatic request pipelining per broker connection

    *   Metadata-driven partition leader routing

    *   Producer: acks modes (-1/0/1), key-based partitioning (murmur2),
        headers, fire-and-forget (acks=0), idempotent producer with
        epoch-bump recovery, transactional / exactly-once stream processing

    *   Consumer: manual partition assignment, offset tracking, poll-based
        message delivery; consumer groups with
        JoinGroup/SyncGroup/Heartbeat, sticky partition assignment, offset
        commit/fetch, automatic rebalancing, session-expiry recovery

    *   Compression: lz4, gzip, zstd, snappy (each gated by build-time
        library detection)

    *   TLS (OpenSSL) and SASL/PLAIN, SCRAM-SHA-256/512 (with full RFC 5802
        server-signature verification)

    *   Automatic reconnection at the connection layer; bootstrap-broker
        failover; periodic metadata refresh

ANYEVENT INTEGRATION
    AnyEvent has EV as one of its backends, so EV::Kafka can be used in
    AnyEvent applications seamlessly.

NO UTF-8 SUPPORT
    This module handles all values as bytes. Encode your UTF-8 strings
    before passing them:

        use Encode;

        $kafka->produce($topic, $key, encode_utf8($val), sub { ... });

CLUSTER CLIENT METHODS
  new(%options)
    Create a new EV::Kafka client. Returns a blessed "EV::Kafka::Client"
    object.

        my $kafka = EV::Kafka->new(
            brokers  => '10.0.0.1:9092,10.0.0.2:9092',
            acks     => -1,
            on_error => sub { warn @_ },
        );

    Options:

    brokers => 'Str'
        Comma-separated list of bootstrap broker addresses (host:port).
        Default: "127.0.0.1:9092".

    client_id => 'Str' (default 'ev-kafka')
        Client identifier sent to brokers.

    tls => Bool
        Enable TLS encryption.

    tls_ca_file => 'Str'
        Path to CA certificate file for TLS verification.

    tls_skip_verify => Bool
        Skip TLS certificate verification.

    sasl => \%opts
        Enable SASL authentication. Supported mechanisms: "PLAIN",
        "SCRAM-SHA-256", "SCRAM-SHA-512".

            sasl => { mechanism => 'PLAIN', username => 'user', password => 'pass' }

    acks => Int (default -1)
        Producer acknowledgment mode. -1 = all in-sync replicas, 0 = no
        acknowledgment (fire-and-forget), 1 = leader only.

    linger_ms => Int (default 5)
        Time in milliseconds to accumulate records before flushing a batch.
        Lower values reduce latency; higher values improve throughput.

    batch_size => Int (default 16384)
        Maximum batch size in bytes before a batch is flushed immediately.

    compression => 'Str'
        Compression type for produce batches: 'lz4' (requires liblz4),
        'gzip' (requires zlib), 'zstd' (requires libzstd), 'snappy'
        (requires libsnappy), or "undef" for none.

    idempotent => Bool (default 0)
        Enable idempotent producer. Calls "InitProducerId" on connect and
        sets producer_id/epoch/sequence in each RecordBatch for exactly-once
        delivery (broker-side deduplication). Only one batch per (topic,
        partition) is in-flight at a time when this is enabled, to prevent
        sequence-number aliasing on retry.

    transactional_id => 'Str'
        Enable transactional producer. Implies idempotent. Required for
        "begin_transaction"/"commit_transaction"/"abort_transaction" and
        "send_offsets_to_transaction" (full EOS).

    partitioner => $cb->($topic, $key, $num_partitions)
        Custom partition selection function. Default: murmur2 hash of key,
        or round-robin for null keys.

    on_error => $cb->($errstr)
        Error callback. Default: "die".

    on_connect => $cb->()
        Called once after initial metadata fetch completes.

    on_message => $cb->($topic, $partition, $offset, $key, $value, $headers)
        Message delivery callback for consumer operations.

    fetch_max_wait_ms => Int (default 500)
        Maximum time the broker waits to accumulate "fetch_min_bytes" of
        data before returning a fetch response.

    fetch_max_bytes => Int (default 1048576)
        Maximum bytes per fetch response.

    fetch_min_bytes => Int (default 1)
        Minimum bytes before the broker responds to a fetch.

    metadata_refresh => Int (default 300)
        Periodic metadata refresh interval in seconds. Set to 0 to disable.
        Refreshes happen in the background, so consumers and producers pick
        up leader changes without waiting for a request to fail first.

    loop => $ev_loop
        EV loop object to use. Default: "EV::default_loop".

  connect([$cb])
    Connect to the cluster. Connects to the first available bootstrap
    broker, fetches cluster metadata, then fires $cb->($metadata). On
    bootstrap-broker failure the next address is tried; if all fail, the
    "on_error" handler fires.

        $kafka->connect(sub {
            my $meta = shift;
            # $meta->{brokers}, $meta->{topics}
        });

  produce($topic, $key, $value, [\%opts,] [$cb])
    Produce a message. Routes to the correct partition leader automatically.

        # with callback (acks=1 or acks=-1)
        $kafka->produce('topic', 'key', 'value', sub {
            my ($result, $err) = @_;
        });

        # with headers
        $kafka->produce('topic', 'key', 'value',
            { headers => { 'h1' => 'v1' } }, sub { ... });

        # fire-and-forget (acks=0)
        $kafka->produce('topic', 'key', 'value');

        # explicit partition
        $kafka->produce('topic', 'key', 'value',
            { partition => 3 }, sub { ... });

  produce_many(\@messages, $cb)
    Produce multiple messages with a single completion callback. Each
    message is an arrayref "[$topic, $key, $value]" or a hashref "{topic,
    key, value}". $cb fires when all messages are acknowledged.

        $kafka->produce_many([
            ['my-topic', 'k1', 'v1'],
            ['my-topic', 'k2', 'v2'],
        ], sub {
            my $errors = shift;
            warn "some failed: @$errors" if $errors;
        });

  flush([$cb])
    Flush all accumulated produce batches and wait for all in-flight
    requests to complete. $cb fires when all pending responses have been
    received.

  assign(\@partitions)
    Manually assign partitions for consuming.

        $kafka->assign([
            { topic => 'my-topic', partition => 0, offset => 0 },
            { topic => 'my-topic', partition => 1, offset => 100 },
        ]);

  seek($topic, $partition, $offset, [$cb])
    Seek a partition to a specific offset. Use -2 for earliest, -1 for
    latest. Updates the assignment in-place.

        $kafka->seek('my-topic', 0, -1, sub { print "at latest\n" });

  offsets_for($topic, $cb)
    Get earliest and latest offsets for all partitions of a topic.

        $kafka->offsets_for('my-topic', sub {
            my $offsets = shift;
            # { 0 => { earliest => 0, latest => 42 }, 1 => ... }
        });

  lag($cb)
    Get consumer lag for all assigned partitions.

        $kafka->lag(sub {
            my $lag = shift;
            # { "topic:0" => { current => 10, latest => 42, lag => 32 } }
        });

  error_name($code)
    Convert a Kafka numeric error code to its name. Callable as a method or
    class function.

        $kafka->error_name(3);                  # "UNKNOWN_TOPIC_OR_PARTITION"
        EV::Kafka::Client::error_name(3);       # same

  poll([$cb])
    Fetch messages from assigned partitions. Calls "on_message" for each
    received record. $cb fires when all fetch responses have arrived.

        my $timer = EV::timer 0, 0.1, sub { $kafka->poll };

  subscribe(@topics, %opts)
    Join a consumer group and subscribe to one or more topics. The list of
    topic names comes first, followed by option key/value pairs. The group
    protocol handles partition assignment automatically.

        $kafka->subscribe('topic-a', 'topic-b',
            group_id           => 'my-group',
            session_timeout    => 30000,      # ms
            rebalance_timeout  => 60000,      # ms
            heartbeat_interval => 3,          # seconds
            auto_commit        => 1,          # commit on unsubscribe (default)
            auto_offset_reset  => 'earliest', # or 'latest'
            group_instance_id  => 'pod-abc', # KIP-345 static membership
            on_assign => sub {
                my $partitions = shift;
                # [{topic, partition, offset}, ...]
            },
            on_revoke => sub {
                my $partitions = shift;
            },
        );

  commit([$cb])
    Commit current consumer offsets to the group coordinator.

        $kafka->commit(sub {
            my $err = shift;
            warn "commit failed: $err" if $err;
        });

  unsubscribe([$cb])
    Leave the consumer group (sends LeaveGroup for fast rebalance), stop
    heartbeat and fetch loop. If "auto_commit" is enabled, commits offsets
    before leaving.

  begin_transaction
    Start a transaction. Requires "transactional_id" in constructor.

  send_offsets_to_transaction($group_id, [$cb])
    Commit consumer offsets within the current transaction via
    "TxnOffsetCommit". This is the key step for exactly-once
    consume-process-produce pipelines.

        $kafka->send_offsets_to_transaction('my-group', sub {
            my ($result, $err) = @_;
        });

  commit_transaction([$cb])
    Commit the current transaction. All produced messages and offset commits
    within the transaction become visible atomically.

  abort_transaction([$cb])
    Abort the current transaction. All produced messages are discarded and
    offset commits are rolled back.

  close([$cb])
    Graceful shutdown: stop timers, disconnect all broker connections.

        $kafka->close(sub { EV::break });

LOW-LEVEL CONNECTION METHODS
    "EV::Kafka::Conn" provides direct access to a single broker connection.
    Useful for custom protocols, debugging, or when cluster-level routing is
    not needed.

        my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
        $conn->on_error(sub { warn @_ });
        $conn->on_connect(sub { ... });
        $conn->connect('127.0.0.1', 9092, 5.0);

  connect($host, $port, [$timeout])
    Connect to a broker. Timeout in seconds (0 = no timeout).

  disconnect
    Disconnect from broker.

  connected
    Returns true if the connection is ready (ApiVersions handshake
    complete).

  metadata(\@topics, $cb)
    Request cluster metadata. Pass "undef" for all topics.

        $conn->metadata(['my-topic'], sub {
            my ($result, $err) = @_;
            # $result->{brokers}, $result->{topics}
        });

  produce($topic, $partition, $key, $value, [\%opts,] [$cb])
    Produce a message to a specific partition.

        $conn->produce('topic', 0, 'key', 'value', sub {
            my ($result, $err) = @_;
        });

    Options: "acks" (default 1), "headers" (hashref), "timestamp" (epoch ms,
    default now), "compression" ('none', 'lz4', 'gzip', 'zstd', 'snappy';
    each requires its respective library at build time).

  produce_batch($topic, $partition, \@records, [\%opts,] [$cb])
    Produce multiple records in a single RecordBatch. Each record is "{key,
    value, headers}". Options: "acks", "compression", "producer_id",
    "producer_epoch", "base_sequence".

        $conn->produce_batch('topic', 0, [
            { key => 'k1', value => 'v1' },
            { key => 'k2', value => 'v2' },
        ], sub { my ($result, $err) = @_ });

  fetch($topic, $partition, $offset, [\%opts,] $cb)
    Fetch messages from a partition starting at $offset. %opts may set
    "max_bytes" (per-partition cap, default 1 MiB), "max_wait_ms" (broker
    block-time, default 500), "min_bytes" (default 1).

        $conn->fetch('topic', 0, 0, sub {
            my ($result, $err) = @_;
            for my $rec (@{ $result->{topics}[0]{partitions}[0]{records} }) {
                printf "offset=%d key=%s value=%s\n",
                    $rec->{offset}, $rec->{key}, $rec->{value};
            }
        });

  fetch_multi(\%topics, [\%opts,] $cb)
    Multi-partition fetch in a single request. Groups multiple
    topic-partitions into one Fetch call to the broker. %opts accepts the
    same keys as "fetch".

        $conn->fetch_multi({
            'topic-a' => [{ partition => 0, offset => 10 },
                           { partition => 1, offset => 20 }],
            'topic-b' => [{ partition => 0, offset => 0 }],
        }, sub { my ($result, $err) = @_ });

    Used internally by poll() to batch fetches by broker leader, with
    "max_bytes"/"max_wait_ms"/"min_bytes" taken from the cluster client
    config.

  list_offsets($topic, $partition, $timestamp, $cb)
    Get offsets by timestamp. Use -2 for earliest, -1 for latest.

  find_coordinator($key, $cb, [$key_type])
    Find the coordinator broker. $key_type: 0=group (default),
    1=transaction.

  join_group($group_id, $member_id, \@topics, $cb, [$session_timeout_ms, $rebalance_timeout_ms, $group_instance_id])
    Join a consumer group. Pass $group_instance_id for KIP-345 static
    membership.

  sync_group($group_id, $generation_id, $member_id, \@assignments, $cb, [$group_instance_id])
    Synchronize group state after join.

  heartbeat($group_id, $generation_id, $member_id, $cb, [$group_instance_id])
    Send heartbeat to group coordinator.

  offset_commit($group_id, $generation_id, $member_id, \@offsets, $cb)
    Commit consumer offsets.

  offset_fetch($group_id, \@topics, $cb)
    Fetch committed offsets for a consumer group.

  api_versions
    Returns a hashref of supported API keys to max versions, or undef if not
    yet negotiated.

        my $vers = $conn->api_versions;
        # { 0 => 7, 1 => 11, 3 => 8, ... }

  on_error([$cb]), on_connect([$cb]), on_disconnect([$cb])
    Set connection-level handler callbacks. Call with no argument or "undef"
    to clear.

  client_id($id)
    Set the client identifier.

  tls($enable, [$ca_file, $skip_verify])
    Configure TLS.

  sasl($mechanism, [$username, $password])
    Configure SASL authentication.

  auto_reconnect($enable, [$delay_ms])
    Enable automatic reconnection with delay in milliseconds (default 1000).

  leave_group($group_id, $member_id, $cb)
    Send LeaveGroup to coordinator for fast partition rebalance.

  create_topics(\@topics, $timeout_ms, $cb)
    Create topics. Each element: "{name, num_partitions,
    replication_factor}".

        $conn->create_topics(
            [{ name => 'new-topic', num_partitions => 3, replication_factor => 1 }],
            5000, sub { my ($res, $err) = @_ }
        );

  delete_topics(\@topic_names, $timeout_ms, $cb)
    Delete topics by name.

  init_producer_id($transactional_id, $txn_timeout_ms, $cb)
    Initialize a producer ID for idempotent/transactional produce. Pass
    "undef" for non-transactional idempotent producer.

  add_partitions_to_txn($txn_id, $producer_id, $epoch, \@topics, $cb)
    Register partitions with the transaction coordinator.

  end_txn($txn_id, $producer_id, $epoch, $committed, $cb)
    Commit ("$committed=1") or abort ("$committed=0") a transaction.

  txn_offset_commit($txn_id, $group_id, $producer_id, $epoch, $generation, $member_id, \@offsets, $cb)
    Commit consumer offsets within a transaction (API 28).

  pending
    Number of requests awaiting broker response.

  state
    Connection state as integer (0=disconnected, 6=ready).

UTILITY FUNCTIONS
  EV::Kafka::_murmur2($key)
    Kafka-compatible murmur2 hash. Returns a non-negative 31-bit integer.

  EV::Kafka::_crc32c($data)
    CRC32C checksum (Castagnoli). Used internally for RecordBatch integrity.

  EV::Kafka::_error_name($code)
    Convert Kafka error code to string name.

RESULT STRUCTURES
  Produce result
        $result = {
            topics => [{
                topic      => 'name',
                partitions => [{
                    partition   => 0,
                    error_code  => 0,
                    base_offset => 42,
                }],
            }],
        };

  Fetch result
        $result = {
            topics => [{
                topic      => 'name',
                partitions => [{
                    partition      => 0,
                    error_code     => 0,
                    high_watermark => 100,
                    records => [{
                        offset    => 42,
                        timestamp => 1712345678000,
                        key       => 'key',      # or undef
                        value     => 'value',     # or undef
                        headers   => { h => 'v' },  # if present
                    }],
                }],
            }],
        };

  Metadata result
        $result = {
            controller_id => 0,
            brokers => [{ node_id => 0, host => '10.0.0.1', port => 9092 }],
            topics  => [{
                name       => 'topic',
                error_code => 0,
                partitions => [{
                    partition  => 0,
                    leader     => 0,
                    error_code => 0,
                }],
            }],
        };

ERROR HANDLING
    Errors are delivered through two channels:

    Connection-level errors fire the "on_error" callback (or "croak" if none
    set). These include connection refused, DNS failure, TLS errors, SASL
    auth failure, and protocol violations.
    Request-level errors are delivered as the second argument to the request
    callback: "$cb->($result, $error)". If $error is defined, $result may be
    undef.

    Within result structures, per-partition "error_code" fields use Kafka
    numeric codes:

        0   No error
        1   OFFSET_OUT_OF_RANGE
        3   UNKNOWN_TOPIC_OR_PARTITION
        6   NOT_LEADER_OR_FOLLOWER       (retried by the producer)
        15  COORDINATOR_NOT_AVAILABLE    (retried)
        16  NOT_COORDINATOR              (retried)
        22  ILLEGAL_GENERATION           (group rejoin)
        25  UNKNOWN_MEMBER_ID            (group rejoin)
        27  REBALANCE_IN_PROGRESS        (group rejoin)
        36  TOPIC_ALREADY_EXISTS
        45  OUT_OF_ORDER_SEQUENCE_NUMBER (idempotent: epoch bump)
        46  DUPLICATE_SEQUENCE_NUMBER    (idempotent: epoch bump)
        79  MEMBER_ID_REQUIRED           (group rejoin with assigned id)

    Use EV::Kafka::Client::error_name($code) for the full list.

    When a broker disconnects mid-flight, all pending callbacks receive
    "(undef, "connection closed by broker")" or "(undef, "disconnected")".

ENVIRONMENT VARIABLES
    These are used by tests and examples (not by the module itself):

        TEST_KAFKA_BROKER    broker address for tests (host:port)
        KAFKA_BROKER         broker address for examples
        KAFKA_HOST           broker hostname for low-level examples
        KAFKA_PORT           broker port for low-level examples
        KAFKA_TOPIC          topic name for examples
        KAFKA_GROUP_ID       consumer group for examples
        KAFKA_LIMIT          message limit for consume example
        KAFKA_COUNT          message count for fire-and-forget
        BENCH_BROKER         broker for benchmarks
        BENCH_MESSAGES       message count for benchmarks
        BENCH_VALUE_SIZE     value size in bytes for benchmarks
        BENCH_TOPIC          topic name for benchmarks

QUICK START
    Minimal producer + consumer lifecycle:

        use EV;
        use EV::Kafka;

        my $kafka = EV::Kafka->new(
            brokers    => '127.0.0.1:9092',
            acks       => 1,
            on_error   => sub { warn "kafka: @_\n" },
            on_message => sub {
                my ($topic, $part, $offset, $key, $value) = @_;
                print "got: $key=$value\n";
            },
        );

        $kafka->connect(sub {
            # produce
            $kafka->produce('test', 'k1', 'hello', sub {
                print "produced\n";

                # consume from the beginning
                $kafka->assign([{topic=>'test', partition=>0, offset=>0}]);
                $kafka->seek('test', 0, -2, sub {
                    my $t = EV::timer 0, 0.1, sub { $kafka->poll };
                    $kafka->{cfg}{_t} = $t;
                });
            });
        });

        EV::run;

COOKBOOK
  Produce JSON with headers
        use JSON::PP;
        my $json = JSON::PP->new->utf8;

        $kafka->produce('events', 'user-42',
            $json->encode({ action => 'click', page => '/home' }),
            { headers => { 'content-type' => 'application/json' } },
            sub { ... }
        );

  Consume from latest offset only
        $kafka->subscribe('live-feed',
            group_id          => 'realtime',
            auto_offset_reset => 'latest',
            on_assign         => sub { print "ready\n" },
        );

  Graceful shutdown
        $SIG{INT} = sub {
            $kafka->commit(sub {
                $kafka->unsubscribe(sub {
                    $kafka->close(sub { EV::break });
                });
            });
        };

  At-least-once processing
        $kafka->subscribe('jobs',
            group_id    => 'workers',
            auto_commit => 0,
        );

        # in on_message: process, then commit
        on_message => sub {
            process($_[4]);
            $kafka->commit if ++$count % 100 == 0;
        },

  Batch produce
        $kafka->produce_many([
            ['events', 'k1', 'v1'],
            ['events', 'k2', 'v2'],
            ['events', 'k3', 'v3'],
        ], sub {
            my $errs = shift;
            print $errs ? "some failed\n" : "all done\n";
        });

  Exactly-once stream processing (EOS)
        my $kafka = EV::Kafka->new(
            brokers          => '...',
            transactional_id => 'my-eos-app',
            acks             => -1,
            on_message => sub {
                my ($t, $p, $off, $key, $value) = @_;
                my $result = process($value);
                $kafka->produce('output-topic', $key, $result);
            },
        );

        # consume-process-produce loop:
        $kafka->begin_transaction;
        $kafka->poll(sub {
            $kafka->send_offsets_to_transaction('my-group', sub {
                $kafka->commit_transaction(sub {
                    $kafka->begin_transaction;  # next transaction
                });
            });
        });

  Topic administration
        my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
        $conn->on_connect(sub {
            $conn->create_topics(
                [{ name => 'new-topic', num_partitions => 6, replication_factor => 3 }],
                10000, sub { ... }
            );
        });

BENCHMARKS
    Measured on Linux with TCP loopback to Redpanda, 100-byte values, Perl
    5.40.2, 20K messages ("bench/benchmark.pl"):

        Pipeline produce (acks=1)   100K msg/sec    11.0 MB/s
        Fire-and-forget (acks=0)    120K msg/sec    13.2 MB/s
        Sequential round-trip        24K msg/sec    42 us avg latency
        Metadata request             21K req/sec    47 us avg latency

    Throughput by value size (pipelined, acks=1):

           10 bytes   105K msg/sec      1.5 MB/s
          100 bytes   100K msg/sec     10.5 MB/s
         1000 bytes    70K msg/sec     70.7 MB/s
        10000 bytes    20K msg/sec    202.0 MB/s

    Latency histogram (20K round-trips, acks=1, "bench/latency.pl"):

        median: 39 us    p90: 59 us    p95: 75 us    p99: 122 us

    Pipeline produce throughput is limited by Perl callback overhead per
    message. Fire-and-forget mode ("acks=0") skips the response cycle
    entirely, reaching ~120K msg/sec. Sequential round-trip (one produce,
    wait for ack, repeat) measures raw broker latency around 39us median.

    The fetch path is sequential (fetch, process, fetch again) which
    introduces one round-trip per batch. With larger "max_bytes" and dense
    topics, fetch throughput increases proportionally.

    Run "perl bench/benchmark.pl" for throughput results. Set
    "BENCH_BROKER", "BENCH_MESSAGES", "BENCH_VALUE_SIZE", and "BENCH_TOPIC"
    to customize.

    Run "perl bench/latency.pl" for a latency histogram with percentiles
    (min, avg, median, p90, p95, p99, max).

KAFKA PROTOCOL
    This module implements the Kafka binary protocol directly in XS. All
    integers are big-endian. Requests use a 4-byte size prefix followed by a
    header (API key, version, correlation ID, client ID) and a
    version-specific body.

    Responses are matched to requests by correlation ID. The broker
    guarantees FIFO ordering per connection, so the response queue is a
    simple FIFO.

    RecordBatch encoding (magic=2) is used for produce. CRC32C covers the
    batch from attributes through the last record. Records use
    ZigZag-encoded varints for lengths and deltas.

    The connection handshake sends ApiVersions (v0) on connect to discover
    supported protocol versions. SASL authentication uses SaslHandshake (v1)
    + SaslAuthenticate (v2) with PLAIN mechanism.

    Consumer group protocol uses sticky partition assignment with
    MEMBER_ID_REQUIRED (error 79) retry per KIP-394.

    Non-flexible API versions are used throughout (capped below the
    flexible-version threshold for each API) to avoid the compact encoding
    complexity.

LIMITATIONS
    *   Blocking DNS for hostnames -- numeric IPv4/IPv6 literals take a fast
        path ("AI_NUMERICHOST") and never block. Non-literal hostnames call
        "getaddrinfo" synchronously, blocking the EV loop until the resolver
        responds. For fully non-blocking operation against named brokers,
        pre-resolve in Perl-land.

    *   No GSSAPI/OAUTHBEARER -- only SASL/PLAIN and SCRAM-SHA-256/512 are
        implemented.

    *   No flexible API versions -- all API versions are capped below the
        flexible-version threshold to avoid compact string/array encoding.
        Works with Kafka 0.11+ and Redpanda; loses access to a few newer
        protocol features.

    *   Producer retry policy -- transient errors (NOT_LEADER,
        COORDINATOR_NOT_AVAILABLE) trigger metadata refresh and up to 3
        retries with backoff. Hard idempotent errors (OUT_OF_ORDER_SEQUENCE,
        DUPLICATE_SEQUENCE) trigger one InitProducerId-with-fresh-epoch
        recovery attempt. Other broker errors are surfaced to the callback
        immediately.

AUTHOR
    vividsnow

LICENSE
    This library is free software; you can redistribute it and/or modify it
    under the same terms as Perl itself.

