diff options
author | Dmitry Lazurkin <dilaz03@gmail.com> | 2016-11-25 22:45:02 +0300 |
---|---|---|
committer | Martin Mathieson <martin.r.mathieson@googlemail.com> | 2016-12-05 21:45:18 +0000 |
commit | 548b9febb3c915b3693af0f5e8fe338447ca0f11 (patch) | |
tree | 982ecb5e5a5e5f3a9ddc51be0f586b2558741f7e | |
parent | a275e3fd0aecd98b5118ca08a3622e8cb1468b2d (diff) |
kafka: add dissection for rest of api keys
- support rest of api keys
- dissect kafka.required_acks with constants
- dissect kafka.message_timestamp_type
- add expert info about missing request
Change-Id: I3d18936adac6702a61f545385bdec1b75b564bd9
Reviewed-on: https://code.wireshark.org/review/18954
Petri-Dish: Alexis La Goutte <alexis.lagoutte@gmail.com>
Tested-by: Petri Dish Buildbot <buildbot-no-reply@wireshark.org>
Reviewed-by: Martin Mathieson <martin.r.mathieson@googlemail.com>
-rw-r--r-- | epan/dissectors/packet-kafka.c | 2190 |
1 files changed, 2061 insertions, 129 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c index 589ba1477b..9d8bc3f60e 100644 --- a/epan/dissectors/packet-kafka.c +++ b/epan/dissectors/packet-kafka.c @@ -1,5 +1,5 @@ /* packet-kafka.c - * Routines for Kafka Protocol dissection (version 0.8 and later) + * Routines for Kafka Protocol dissection (version 0.8 - 0.10.1.0) * Copyright 2013, Evan Huus <eapache@gmail.com> * * https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol @@ -38,69 +38,109 @@ void proto_register_kafka(void); void proto_reg_handoff_kafka(void); -static int proto_kafka = -1; -static int hf_kafka_len = -1; -static int hf_kafka_request_api_key = -1; -static int hf_kafka_response_api_key = -1; +static int proto_kafka = -1; +static int hf_kafka_len = -1; +static int hf_kafka_request_api_key = -1; +static int hf_kafka_response_api_key = -1; static int hf_kafka_request_api_version = -1; static int hf_kafka_response_api_version = -1; -static int hf_kafka_correlation_id = -1; -static int hf_kafka_client_id = -1; -static int hf_kafka_string_len = -1; -static int hf_kafka_bytes_len = -1; -static int hf_kafka_array_count = -1; -static int hf_kafka_required_acks = -1; -static int hf_kafka_timeout = -1; -static int hf_kafka_topic_name = -1; -static int hf_kafka_partition_id = -1; -static int hf_kafka_replica = -1; -static int hf_kafka_isr = -1; -static int hf_kafka_partition_leader = -1; -static int hf_kafka_message_set_size = -1; -static int hf_kafka_message_size = -1; -static int hf_kafka_message_crc = -1; -static int hf_kafka_message_magic = -1; -static int hf_kafka_message_codec = -1; -static int hf_kafka_message_timestamp = -1; -static int hf_kafka_message_key = -1; -static int hf_kafka_message_value = -1; -static int hf_kafka_request_frame = -1; -static int hf_kafka_response_frame = -1; -static int hf_kafka_consumer_group = -1; -static int hf_kafka_offset = -1; -static int hf_kafka_offset_time = -1; -static int hf_kafka_max_offsets = -1; -static int hf_kafka_metadata = -1; -static int hf_kafka_error = -1; -static int hf_kafka_broker_nodeid = -1; -static int hf_kafka_broker_host = -1; -static int hf_kafka_broker_port = -1; -static int hf_kafka_broker_rack = -1; -static int hf_kafka_cluster_id = -1; -static int hf_kafka_controller_id = -1; -static int hf_kafka_is_internal = -1; -static int hf_kafka_min_bytes = -1; -static int hf_kafka_max_bytes = -1; -static int hf_kafka_max_wait_time = -1; -static int hf_kafka_throttle_time = -1; -static int hf_kafka_api_versions_api_key = -1; +static int hf_kafka_correlation_id = -1; +static int hf_kafka_client_id = -1; +static int hf_kafka_client_host = -1; +static int hf_kafka_string_len = -1; +static int hf_kafka_bytes_len = -1; +static int hf_kafka_array_count = -1; +static int hf_kafka_required_acks = -1; +static int hf_kafka_timeout = -1; +static int hf_kafka_topic_name = -1; +static int hf_kafka_partition_id = -1; +static int hf_kafka_replica = -1; +static int hf_kafka_replication_factor = -1; +static int hf_kafka_isr = -1; +static int hf_kafka_partition_leader = -1; +static int hf_kafka_message_set_size = -1; +static int hf_kafka_message_size = -1; +static int hf_kafka_message_crc = -1; +static int hf_kafka_message_magic = -1; +static int hf_kafka_message_codec = -1; +static int hf_kafka_message_timestamp_type = -1; +static int hf_kafka_message_timestamp = -1; +static int hf_kafka_message_key = -1; +static int hf_kafka_message_value = -1; +static int hf_kafka_request_frame = -1; +static int hf_kafka_response_frame = -1; +static int hf_kafka_consumer_group = -1; +static int hf_kafka_group_state = -1; +static int hf_kafka_offset = -1; +static int hf_kafka_offset_time = -1; +static int hf_kafka_max_offsets = -1; +static int hf_kafka_metadata = -1; +static int hf_kafka_error = -1; +static int hf_kafka_broker_nodeid = -1; +static int hf_kafka_broker_host = -1; +static int hf_kafka_broker_port = -1; +static int hf_kafka_broker_rack = -1; +static int hf_kafka_broker_security_protocol_type = -1; +static int hf_kafka_cluster_id = -1; +static int hf_kafka_controller_id = -1; +static int hf_kafka_controller_epoch = -1; +static int hf_kafka_delete_partitions = -1; +static int hf_kafka_leader_id = -1; +static int hf_kafka_group_leader_id = -1; +static int hf_kafka_leader_epoch = -1; +static int hf_kafka_is_internal = -1; +static int hf_kafka_min_bytes = -1; +static int hf_kafka_max_bytes = -1; +static int hf_kafka_max_wait_time = -1; +static int hf_kafka_throttle_time = -1; +static int hf_kafka_api_versions_api_key = -1; static int hf_kafka_api_versions_min_version = -1; static int hf_kafka_api_versions_max_version = -1; - -static gint ett_kafka = -1; -static gint ett_kafka_message = -1; -static gint ett_kafka_message_set = -1; -static gint ett_kafka_metadata_replicas = -1; -static gint ett_kafka_metadata_isr = -1; -static gint ett_kafka_metadata_broker = -1; -static gint ett_kafka_metadata_brokers = -1; -static gint ett_kafka_metadata_topics = -1; -static gint ett_kafka_request_topic = -1; -static gint ett_kafka_request_partition = -1; -static gint ett_kafka_response_topic = -1; -static gint ett_kafka_response_partition = -1; -static gint ett_kafka_api_versions = -1; - +static int hf_kafka_session_timeout = -1; +static int hf_kafka_rebalance_timeout = -1; +static int hf_kafka_member_id = -1; +static int hf_kafka_protocol_type = -1; +static int hf_kafka_protocol_name = -1; +static int hf_kafka_protocol_metadata = -1; +static int hf_kafka_member_metadata = -1; +static int hf_kafka_generation_id = -1; +static int hf_kafka_member_assignment = -1; +static int hf_kafka_sasl_mechanism = -1; +static int hf_kafka_num_partitions = -1; +static int hf_kafka_zk_version = -1; +static int hf_kafka_config_key = -1; +static int hf_kafka_config_value = -1; +static int hf_kafka_commit_timestamp = -1; +static int hf_kafka_retention_time = -1; + +static int ett_kafka = -1; +static int ett_kafka_message = -1; +static int ett_kafka_message_set = -1; +static int ett_kafka_replicas = -1; +static int ett_kafka_isrs = -1; +static int ett_kafka_broker = -1; +static int ett_kafka_brokers = -1; +static int ett_kafka_broker_end_point = -1; +static int ett_kafka_topics = -1; +static int ett_kafka_topic = -1; +static int ett_kafka_request_topic = -1; +static int ett_kafka_request_partition = -1; +static int ett_kafka_response_topic = -1; +static int ett_kafka_response_partition = -1; +static int ett_kafka_api_version = -1; +static int ett_kafka_group_protocols = -1; +static int ett_kafka_group_protocol = -1; +static int ett_kafka_group_members = -1; +static int ett_kafka_group_member = -1; +static int ett_kafka_group_assignments = -1; +static int ett_kafka_group_assignment = -1; +static int ett_kafka_group = -1; +static int ett_kafka_sasl_enabled_mechanisms = -1; +static int ett_kafka_replica_assignment = -1; +static int ett_kafka_configs = -1; +static int ett_kafka_config = -1; + +static expert_field ei_kafka_request_missing = EI_INIT; static expert_field ei_kafka_message_decompress = EI_INIT; static expert_field ei_kafka_bad_string_length = EI_INIT; static expert_field ei_kafka_bad_bytes_length = EI_INIT; @@ -197,28 +237,58 @@ static const value_string kafka_errors[] = { { 0, NULL } }; -#define KAFKA_COMPRESSION_NONE 0 -#define KAFKA_COMPRESSION_GZIP 1 -#define KAFKA_COMPRESSION_SNAPPY 2 -#define KAFKA_COMPRESSION_LZ4 3 -static const value_string kafka_codecs[] = { - { KAFKA_COMPRESSION_NONE, "None" }, - { KAFKA_COMPRESSION_GZIP, "Gzip" }, - { KAFKA_COMPRESSION_SNAPPY, "Snappy" }, - { KAFKA_COMPRESSION_LZ4, "LZ4" }, +#define KAFKA_ACK_NOT_REQUIRED 0 +#define KAFKA_ACK_LEADER 1 +#define KAFKA_ACK_FULL_ISR -1 +static const value_string kafka_acks[] = { + { KAFKA_ACK_NOT_REQUIRED, "Not Required" }, + { KAFKA_ACK_LEADER, "Leader" }, + { KAFKA_ACK_FULL_ISR, "Full ISR" }, + { 0, NULL } +}; + +#define KAFKA_MESSAGE_CODEC_MASK 0x07 +#define KAFKA_MESSAGE_CODEC_NONE 0 +#define KAFKA_MESSAGE_CODEC_GZIP 1 +#define KAFKA_MESSAGE_CODEC_SNAPPY 2 +#define KAFKA_MESSAGE_CODEC_LZ4 3 +static const value_string kafka_message_codecs[] = { + { KAFKA_MESSAGE_CODEC_NONE, "None" }, + { KAFKA_MESSAGE_CODEC_GZIP, "Gzip" }, + { KAFKA_MESSAGE_CODEC_SNAPPY, "Snappy" }, + { KAFKA_MESSAGE_CODEC_LZ4, "LZ4" }, { 0, NULL } }; #ifdef HAVE_SNAPPY static const guint8 kafka_xerial_header[8] = {0x82, 0x53, 0x4e, 0x41, 0x50, 0x50, 0x59, 0x00}; #endif +#define KAFKA_MESSAGE_TIMESTAMP_MASK 0x08 +static const value_string kafka_message_timestamp_types[] = { + { 0, "CreateTime" }, + { 1, "LogAppendTime" }, + { 0, NULL } +}; + +static const value_string kafka_security_protocol_types[] = { + { 0, "PLAINTEXT" }, + { 1, "SSL" }, + { 2, "SASL_PLAINTEXT" }, + { 3, "SASL_SSL" }, + { 0, NULL } +}; + /* List/range of TCP ports to register */ static range_t *current_kafka_tcp_range = NULL; -typedef guint16 kafka_api_version_t; +typedef gint16 kafka_api_key_t; +typedef gint16 kafka_api_version_t; +typedef gint16 kafka_error_t; +typedef gint32 kafka_partition_t; +typedef gint64 kafka_offset_t; typedef struct _kafka_query_response_t { - gint16 api_key; + kafka_api_key_t api_key; kafka_api_version_t api_version; guint32 request_frame; guint32 response_frame; @@ -228,8 +298,8 @@ typedef struct _kafka_query_response_t { /* Some values to temporarily remember during dissection */ typedef struct kafka_packet_values_t { - guint32 partition_id; - gint64 offset; + kafka_partition_t partition_id; + kafka_offset_t offset; } kafka_packet_values_t; /* Forward declaration (dissect_kafka_message_set() and dissect_kafka_message() call each other...) */ @@ -239,6 +309,18 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i /* HELPERS */ +static const char * +kafka_error_to_str(kafka_error_t error) +{ + return val_to_str_const(error, kafka_errors, "Unknown"); +} + +static const char * +kafka_api_key_to_str(kafka_api_key_t api_key) +{ + return val_to_str_const(api_key, kafka_apis, "Unknown"); +} + static guint get_kafka_pdu_len(packet_info *pinfo _U_, tvbuff_t *tvb, int offset, void *data _U_) { @@ -389,7 +471,10 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s /* Codec */ proto_tree_add_item(subtree, hf_kafka_message_codec, tvb, offset, 1, ENC_BIG_ENDIAN); - codec = tvb_get_guint8(tvb, offset) & 0x07; + codec = tvb_get_guint8(tvb, offset) & KAFKA_MESSAGE_CODEC_MASK; + + /* Timestamp Type */ + proto_tree_add_item(subtree, hf_kafka_message_timestamp_type, tvb, offset, 1, ENC_BIG_ENDIAN); offset += 1; if (magic_byte >= 1) { @@ -400,7 +485,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s offset = dissect_kafka_bytes(subtree, hf_kafka_message_key, tvb, pinfo, offset, NULL, &bytes_length); switch (codec) { - case KAFKA_COMPRESSION_GZIP: + case KAFKA_MESSAGE_CODEC_GZIP: raw = kafka_get_bytes(subtree, tvb, pinfo, offset); offset += 4; @@ -424,7 +509,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s proto_tree_add_bytes(subtree, hf_kafka_message_value, tvb, offset, 0, NULL); } break; - case KAFKA_COMPRESSION_SNAPPY: + case KAFKA_MESSAGE_CODEC_SNAPPY: #ifdef HAVE_SNAPPY raw = kafka_get_bytes(subtree, tvb, pinfo, offset); offset += 4; @@ -485,8 +570,8 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s } break; #endif - case KAFKA_COMPRESSION_LZ4: - case KAFKA_COMPRESSION_NONE: + case KAFKA_MESSAGE_CODEC_LZ4: + case KAFKA_MESSAGE_CODEC_NONE: default: offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length); @@ -630,20 +715,34 @@ dissect_kafka_offset_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree return offset; } -static int dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +static int +dissect_kafka_error_ret(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_error_t *ret) { - guint16 error = tvb_get_ntohs(tvb, offset); + kafka_error_t error = (kafka_error_t) tvb_get_ntohs(tvb, offset); proto_tree_add_item(tree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN); + offset += 2; + /* Show error in Info column */ if (error != 0) { col_append_fstr(pinfo->cinfo, COL_INFO, - " [%s] ", val_to_str_const(error, kafka_errors, "Unknown")); + " [%s] ", kafka_error_to_str(error)); } - offset += 2; + + if (ret) { + *ret = error; + } + return offset; } static int +dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +{ + return dissect_kafka_error_ret(tvb, pinfo, tree, offset, NULL); +} + +static int dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, kafka_api_version_t api_version) { @@ -725,7 +824,7 @@ dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre int host_start, host_len; guint32 broker_port; - subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_metadata_broker, &ti, "Broker"); + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_broker, &ti, "Broker"); nodeid = tvb_get_ntohl(tvb, offset); proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN); @@ -787,12 +886,12 @@ dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree * offset += 4; sub_start_offset = offset; - subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_metadata_replicas, &subti, "Replicas"); + subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_replicas, &subti, "Replicas"); offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_replica); proto_item_set_len(subti, offset - sub_start_offset); sub_start_offset = offset; - subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_metadata_isr, &subti, "Caught-Up Replicas"); + subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_isrs, &subti, "Caught-Up Replicas"); offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_isr); proto_item_set_len(subti, offset - sub_start_offset); @@ -839,7 +938,7 @@ dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *t proto_tree *subtree; int offset = start_offset; - subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_metadata_brokers, &ti, "Broker Metadata"); + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_brokers, &ti, "Broker Metadata"); offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_broker); proto_item_set_len(ti, offset - start_offset); @@ -853,13 +952,321 @@ dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *t } start_offset = offset; - subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_metadata_topics, &ti, "Topic Metadata"); + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_topics, &ti, "Topic Metadata"); offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_topic); proto_item_set_len(ti, offset - start_offset); return offset; } +/* LEADER_AND_ISR REQUEST/RESPONSE */ + +static int +dissect_kafka_leader_and_isr_request_isr(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + /* isr */ + proto_tree_add_item(tree, hf_kafka_isr, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + return offset; +} + +static int +dissect_kafka_leader_and_isr_request_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + /* replica */ + proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + return offset; +} + +static int +dissect_kafka_leader_and_isr_request_partition_state(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version) +{ + proto_tree *subtree, *subsubtree; + proto_item *subti, *subsubti; + int topic_start, topic_len; + kafka_partition_t partition; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_request_partition, + &subti, "Partition State"); + + /* topic */ + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, + &topic_start, &topic_len); + + /* partition */ + partition = (kafka_partition_t) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* controller_epoch */ + proto_tree_add_item(subtree, hf_kafka_controller_epoch, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* leader */ + proto_tree_add_item(subtree, hf_kafka_leader_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* leader_epoch */ + proto_tree_add_item(subtree, hf_kafka_leader_epoch, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* [isr] */ + subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, + ett_kafka_isrs, + &subsubti, "ISRs"); + offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, + &dissect_kafka_leader_and_isr_request_isr); + proto_item_set_end(subsubti, tvb, offset); + + /* zk_version */ + proto_tree_add_item(subtree, hf_kafka_zk_version, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* [replica] */ + subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, + ett_kafka_replicas, + &subsubti, "Replicas"); + offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, + &dissect_kafka_leader_and_isr_request_replica); + proto_item_set_end(subsubti, tvb, offset); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Topic=%s, Partition-ID=%u)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + topic_start, topic_len, ENC_UTF_8|ENC_NA), + partition); + + return offset; +} + +static int +dissect_kafka_leader_and_isr_request_live_leader(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + gint32 nodeid; + int host_start, host_len; + gint32 broker_port; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_broker, + &subti, "Live Leader"); + + /* id */ + nodeid = (kafka_partition_t) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* host */ + offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset, &host_start, &host_len); + + /* port */ + broker_port = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (node %u: %s:%u)", + nodeid, + tvb_get_string_enc(wmem_packet_scope(), tvb, host_start, host_len, ENC_UTF_8|ENC_NA), + broker_port); + + return offset; +} + +static int +dissect_kafka_leader_and_isr_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + gint32 controller_id; + + /* controller_id */ + controller_id = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(tree, hf_kafka_controller_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* controller_epoch */ + proto_tree_add_item(tree, hf_kafka_controller_epoch, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* [partition_state] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_leader_and_isr_request_partition_state); + + /* [live_leader] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_leader_and_isr_request_live_leader); + + col_append_fstr(pinfo->cinfo, COL_INFO, " (Controller-ID=%d)", controller_id); + + return offset; +} + +static int +dissect_kafka_leader_and_isr_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int topic_start, topic_len; + kafka_partition_t partition; + kafka_error_t error; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_response_partition, + &subti, "Partition"); + + /* topic */ + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len); + + /* partition */ + partition = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* error_code */ + offset = dissect_kafka_error_ret(tvb, pinfo, subtree, offset, &error); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Topic=%s, Partition-ID=%u, Error=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + topic_start, topic_len, ENC_UTF_8|ENC_NA), + partition, + kafka_error_to_str(error)); + + return offset; +} + +static int +dissect_kafka_leader_and_isr_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + + /* [partition] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_leader_and_isr_response_partition); + + return offset; +} + +/* STOP_REPLICA REQUEST/RESPONSE */ + +static int +dissect_kafka_stop_replica_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int topic_start, topic_len; + kafka_partition_t partition; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_request_partition, + &subti, "Partition"); + + /* topic */ + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len); + + /* partition */ + partition = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Topic=%s, Partition-ID=%u)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + topic_start, topic_len, ENC_UTF_8|ENC_NA), + partition); + + return offset; +} + +static int +dissect_kafka_stop_replica_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + gint32 controller_id; + + /* controller_id */ + controller_id = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(tree, hf_kafka_controller_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* controller_epoch */ + proto_tree_add_item(tree, hf_kafka_controller_epoch, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* delete_partitions */ + proto_tree_add_item(tree, hf_kafka_delete_partitions, tvb, offset, 1, ENC_BIG_ENDIAN); + offset += 1; + + /* [partition] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_stop_replica_request_partition); + + col_append_fstr(pinfo->cinfo, COL_INFO, " (Controller-ID=%d)", controller_id); + + return offset; +} + +static int +dissect_kafka_stop_replica_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int topic_start, topic_len; + kafka_error_t error; + kafka_partition_t partition; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_response_partition, + &subti, "Partition"); + + /* topic */ + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len); + + /* partition */ + partition = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* error_code */ + offset = dissect_kafka_error_ret(tvb, pinfo, subtree, offset, &error); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Topic=%s, Partition-ID=%u, Error=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + topic_start, topic_len, ENC_UTF_8|ENC_NA), + partition, + kafka_error_to_str(error)); + + return offset; +} + +static int +dissect_kafka_stop_replica_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + + /* [partition] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_stop_replica_response_partition); + + return offset; +} + /* FETCH REQUEST/RESPONSE */ static int @@ -1227,16 +1634,23 @@ dissect_kafka_offsets_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tr /* API_VERSIONS REQUEST/RESPONSE */ static int -dissect_kafka_api_versions_response_api_versions(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, - int start_offset, kafka_api_version_t api_version _U_) +dissect_kafka_api_versions_request(tvbuff_t *tvb _U_, packet_info *pinfo _U_, proto_tree *tree _U_, + int offset _U_, kafka_api_version_t api_version _U_) +{ + return offset; +} + +static int +dissect_kafka_api_versions_response_api_version(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, + int start_offset, kafka_api_version_t api_version _U_) { proto_item *ti; proto_tree *subtree; int offset = start_offset; kafka_api_version_t api_key, min_version, max_version; - subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_api_versions, &ti, - "API Versions"); + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_api_version, &ti, + "API Version"); api_key = tvb_get_ntohs(tvb, offset); proto_tree_add_item(subtree, hf_kafka_api_versions_api_key, tvb, offset, 2, ENC_BIG_ENDIAN); @@ -1254,13 +1668,13 @@ dissect_kafka_api_versions_response_api_versions(tvbuff_t *tvb, packet_info *pin if (max_version != min_version) { /* Range of versions supported. */ proto_item_append_text(subtree, " %s (v%d-%d)", - val_to_str_const(api_key, kafka_apis, "unknown"), + kafka_api_key_to_str(api_key), min_version, max_version); } else { /* Only one version. */ proto_item_append_text(subtree, " %s (v%d)", - val_to_str_const(api_key, kafka_apis, "unknown"), + kafka_api_key_to_str(api_key), min_version); } @@ -1271,9 +1685,1290 @@ static int dissect_kafka_api_versions_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, kafka_api_version_t api_version) { + /* error_code */ offset = dissect_kafka_error(tvb, pinfo, tree, offset); - return dissect_kafka_array(tree, tvb, pinfo, offset, api_version, - &dissect_kafka_api_versions_response_api_versions); + + /* [api_version] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_api_versions_response_api_version); + + return offset; +} + +/* UPDATE_METADATA REQUEST/RESPONSE */ + +static int +dissect_kafka_update_metadata_request_isr(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + /* isr */ + proto_tree_add_item(tree, hf_kafka_isr, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + return offset; +} + +static int +dissect_kafka_update_metadata_request_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + /* replica */ + proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + return offset; +} + +static int +dissect_kafka_update_metadata_request_partition_state(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version) +{ + proto_tree *subtree, *subsubtree; + proto_item *subti, *subsubti; + int topic_start, topic_len; + kafka_partition_t partition; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_request_partition, + &subti, "Partition State"); + + /* topic */ + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, + &topic_start, &topic_len); + + /* partition */ + partition = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* controller_epoch */ + proto_tree_add_item(subtree, hf_kafka_controller_epoch, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* leader */ + proto_tree_add_item(subtree, hf_kafka_leader_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* leader_epoch */ + proto_tree_add_item(subtree, hf_kafka_leader_epoch, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* [isr] */ + subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, + ett_kafka_isrs, + &subsubti, "ISRs"); + offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, + &dissect_kafka_update_metadata_request_isr); + proto_item_set_end(subsubti, tvb, offset); + + /* zk_version */ + proto_tree_add_item(subtree, hf_kafka_zk_version, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* [replica] */ + subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, + ett_kafka_replicas, + &subsubti, "Replicas"); + offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, + &dissect_kafka_update_metadata_request_replica); + proto_item_set_end(subsubti, tvb, offset); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Topic=%s, Partition-ID=%u)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + topic_start, topic_len, ENC_UTF_8|ENC_NA), + partition); + + return offset; +} + +static int +dissect_kafka_update_metadata_request_end_point(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int host_start, host_len; + gint32 broker_port; + gint16 security_protocol_type; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_broker_end_point, + &subti, "End Point"); + + /* port */ + broker_port = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* host */ + offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset, &host_start, &host_len); + + /* security_protocol_type */ + security_protocol_type = (gint16) tvb_get_ntohs(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_broker_security_protocol_type, tvb, offset, 2, ENC_BIG_ENDIAN); + offset += 2; + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (%s://%s:%d)", + val_to_str_const(security_protocol_type, + kafka_security_protocol_types, "UNKNOWN"), + tvb_get_string_enc(wmem_packet_scope(), tvb, host_start, host_len, + ENC_UTF_8|ENC_NA), + broker_port); + + return offset; +} + +static int +dissect_kafka_update_metadata_request_live_leader(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version) +{ + proto_item *subti; + proto_tree *subtree; + gint32 nodeid; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_broker, + &subti, "Live Leader"); + + /* id */ + nodeid = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + if (api_version == 0) { + int host_start, host_len; + gint32 broker_port; + + /* host */ + offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset, &host_start, &host_len); + + /* port */ + broker_port = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + proto_item_append_text(subti, " (node %u: %s:%u)", + nodeid, + tvb_get_string_enc(wmem_packet_scope(), tvb, host_start, host_len, + ENC_UTF_8|ENC_NA), + broker_port); + } else if (api_version >= 1) { + /* [end_point] */ + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_update_metadata_request_end_point); + + if (api_version >= 2) { + /* rack */ + offset = dissect_kafka_string(subtree, hf_kafka_broker_rack, tvb, pinfo, offset, NULL, NULL); + } + + proto_item_append_text(subti, " (node %d)", + nodeid); + } + + proto_item_set_end(subti, tvb, offset); + + return offset; +} + +static int +dissect_kafka_update_metadata_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + gint32 controller_id; + + /* controller_id */ + controller_id = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(tree, hf_kafka_controller_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* controller_epoch */ + proto_tree_add_item(tree, hf_kafka_controller_epoch, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* [partition_state] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_update_metadata_request_partition_state); + + /* [live_leader] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_update_metadata_request_live_leader); + + col_append_fstr(pinfo->cinfo, COL_INFO, " (Controller-ID=%d)", controller_id); + + return offset; +} + +static int +dissect_kafka_update_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + + return offset; +} + +/* CONTROLLED_SHUTDOWN REQUEST/RESPONSE */ + +static int +dissect_kafka_controlled_shutdown_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + gint32 broker_id; + + /* broker_id */ + broker_id = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(tree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + col_append_fstr(pinfo->cinfo, COL_INFO, " (Broker-ID=%d)", broker_id); + + return offset; +} + +static int +dissect_kafka_controlled_shutdown_response_partition_remaining(tvbuff_t *tvb, packet_info *pinfo, + proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int topic_start, topic_len; + kafka_partition_t partition; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &subti, + "Partition Remaining"); + + /* topic */ + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, + &topic_start, &topic_len); + + /* partition */ + partition = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Topic=%s, Partition-ID=%d)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + topic_start, topic_len, ENC_UTF_8|ENC_NA), + partition); + + return offset; +} + +static int +dissect_kafka_controlled_shutdown_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + + /* [partition_remaining] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_controlled_shutdown_response_partition_remaining); + + return offset; +} + +/* OFFSET_COMMIT REQUEST/RESPONSE */ + +static int +dissect_kafka_offset_commit_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + kafka_partition_t partition_id; + kafka_offset_t partition_offset; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_partition, &subti, + "Partition"); + + /* partition */ + partition_id = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* offset */ + partition_offset = (gint64) tvb_get_ntoh64(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN); + offset += 8; + + if (api_version == 1) { + /* timestamp */ + offset = dissect_kafka_timestamp(tvb, pinfo, subtree, hf_kafka_commit_timestamp, offset); + } + + /* metadata */ + offset = dissect_kafka_string(subtree, hf_kafka_metadata, tvb, pinfo, offset, NULL, NULL); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)", + partition_id, partition_offset); + + return offset; +} + +static int +dissect_kafka_offset_commit_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version) +{ + proto_item *subti; + proto_tree *subtree; + int topic_start, topic_len; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &subti, "Topic"); + + /* topic */ + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, + &topic_start, &topic_len); + + /* [partition] */ + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_offset_commit_request_partition); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Topic=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + topic_start, topic_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_offset_commit_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + int group_start, group_len; + + /* group_id */ + offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, + &group_start, &group_len); + + if (api_version >= 1) { + /* group_generation_id */ + proto_tree_add_item(tree, hf_kafka_generation_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* member_id */ + offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset, NULL, NULL); + + if (api_version >= 2) { + /* retention_time */ + proto_tree_add_item(tree, hf_kafka_retention_time, tvb, offset, 8, ENC_BIG_ENDIAN); + offset += 8; + } + } + + /* [topic] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_offset_commit_request_topic); + + col_append_fstr(pinfo->cinfo, COL_INFO, + " (Group=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + group_start, group_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_offset_commit_response_partition_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + kafka_partition_t partition; + kafka_error_t error; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_partition, &subti, + "Partition Response"); + + /* partition */ + partition = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* error_code */ + offset = dissect_kafka_error_ret(tvb, pinfo, subtree, offset, &error); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Partition-ID=%d, Error=%s)", + partition, kafka_error_to_str(error)); + + return offset; +} + +static int +dissect_kafka_offset_commit_response_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version) +{ + proto_item *subti; + proto_tree *subtree; + int topic_start, topic_len; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &subti, "Response"); + + /* topic */ + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, + &topic_start, &topic_len); + + /* [partition_response] */ + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_offset_commit_response_partition_response); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Topic=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + topic_start, topic_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_offset_commit_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + /* [responses] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_offset_commit_response_response); + + return offset; +} + +/* GROUP_COORDINATOR REQUEST/RESPONSE */ + +static int +dissect_kafka_group_coordinator_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + int group_start, group_len; + + /* group_id */ + offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, + &group_start, &group_len); + + col_append_fstr(pinfo->cinfo, COL_INFO, + " (Group=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + group_start, group_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_group_coordinator_response_coordinator(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + gint32 node_id; + int host_start, host_len; + gint32 port; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_broker, &subti, "Coordinator"); + + /* node_id */ + node_id = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* host */ + offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset, + &host_start, &host_len); + + /* port */ + port = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (node %u: %s:%u)", + node_id, + tvb_get_string_enc(wmem_packet_scope(), tvb, + host_start, host_len, ENC_UTF_8|ENC_NA), + port); + + return offset; +} + +static int +dissect_kafka_group_coordinator_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + + /* coordinator */ + offset = dissect_kafka_group_coordinator_response_coordinator(tvb, pinfo, tree, offset, api_version); + + return offset; +} + +/* JOIN_GROUP REQUEST/RESPONSE */ + +static int +dissect_kafka_join_group_request_group_protocols(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int protocol_start, protocol_len; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_protocol, &subti, + "Group Protocol"); + + /* protocol_name */ + offset = dissect_kafka_string(subtree, hf_kafka_protocol_name, tvb, pinfo, offset, + &protocol_start, &protocol_len); + + /* protocol_metadata */ + offset = dissect_kafka_bytes(subtree, hf_kafka_protocol_metadata, tvb, pinfo, offset, NULL, NULL); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Group-ID=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + protocol_start, protocol_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_join_group_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + proto_item *subti; + proto_tree *subtree; + int group_start, group_len; + int member_start, member_len; + + /* group_id */ + offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, + &group_start, &group_len); + + /* session_timeout */ + proto_tree_add_item(tree, hf_kafka_session_timeout, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + if (api_version > 0) { + /* rebalance_timeout */ + proto_tree_add_item(tree, hf_kafka_rebalance_timeout, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + } + + /* member_id */ + offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset, + &member_start, &member_len); + + /* protocol_type */ + offset = dissect_kafka_string(tree, hf_kafka_protocol_type, tvb, pinfo, offset, NULL, NULL); + + /* [group_protocols] */ + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_protocols, &subti, + "Group Protocols"); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_join_group_request_group_protocols); + proto_item_set_end(subti, tvb, offset); + + col_append_fstr(pinfo->cinfo, COL_INFO, + " (Group=%s, Member=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + group_start, group_len, ENC_UTF_8|ENC_NA), + tvb_get_string_enc(wmem_packet_scope(), tvb, + member_start, member_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_join_group_response_member(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int member_start, member_len; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_member, &subti, "Member"); + + /* member_id */ + offset = dissect_kafka_string(subtree, hf_kafka_member_id, tvb, pinfo, offset, + &member_start, &member_len); + + /* member_metadata */ + offset = dissect_kafka_bytes(subtree, hf_kafka_member_metadata, tvb, pinfo, offset, NULL, NULL); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Member-ID=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + member_start, member_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_join_group_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + proto_item *subti; + proto_tree *subtree; + int member_start, member_len; + + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + + /* generation_id */ + proto_tree_add_item(tree, hf_kafka_generation_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* group_protocol */ + offset = dissect_kafka_string(tree, hf_kafka_protocol_name, tvb, pinfo, offset, NULL, NULL); + + /* leader_id */ + offset = dissect_kafka_string(tree, hf_kafka_group_leader_id, tvb, pinfo, offset, NULL, NULL); + + /* member_id */ + offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset, + &member_start, &member_len); + + /* [member] */ + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_members, &subti, "Members"); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_join_group_response_member); + proto_item_set_end(subti, tvb, offset); + + col_append_fstr(pinfo->cinfo, COL_INFO, + " (Member=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + member_start, member_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +/* HEARTBEAT REQUEST/RESPONSE */ + +static int +dissect_kafka_heartbeat_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + int group_start, group_len; + int member_start, member_len; + + /* group_id */ + offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, + &group_start, &group_len); + + /* group_generation_id */ + proto_tree_add_item(tree, hf_kafka_generation_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* member_id */ + offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset, + &member_start, &member_len); + + col_append_fstr(pinfo->cinfo, COL_INFO, + " (Group=%s, Member=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + group_start, group_len, ENC_UTF_8|ENC_NA), + tvb_get_string_enc(wmem_packet_scope(), tvb, + member_start, member_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_heartbeat_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + + return offset; +} + +/* LEAVE_GROUP REQUEST/RESPONSE */ + +static int +dissect_kafka_leave_group_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + int group_start, group_len; + int member_start, member_len; + + /* group_id */ + offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, + &group_start, &group_len); + + /* member_id */ + offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset, + &member_start, &member_len); + + col_append_fstr(pinfo->cinfo, COL_INFO, + " (Group=%s, Member=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + group_start, group_len, ENC_UTF_8|ENC_NA), + tvb_get_string_enc(wmem_packet_scope(), tvb, + member_start, member_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_leave_group_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + + return offset; +} + +/* SYNC_GROUP REQUEST/RESPONSE */ + +static int +dissect_kafka_sync_group_request_group_assignment(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int member_start, member_len; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_assignment, &subti, + "Group Assignment"); + + /* member_id */ + offset = dissect_kafka_string(subtree, hf_kafka_member_id, tvb, pinfo, offset, + &member_start, &member_len); + + /* member_assigment */ + offset = dissect_kafka_bytes(subtree, hf_kafka_member_assignment, tvb, pinfo, offset, NULL, NULL); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Member=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + member_start, member_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_sync_group_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + proto_item *subti; + proto_tree *subtree; + int group_start, group_len; + int member_start, member_len; + + /* group_id */ + offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, + &group_start, &group_len); + + /* generation_id */ + proto_tree_add_item(tree, hf_kafka_generation_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* member_id */ + offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset, + &member_start, &member_len); + + /* [group_assignment] */ + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_assignments, &subti, + "Group Assignments"); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_sync_group_request_group_assignment); + proto_item_set_end(subti, tvb, offset); + + col_append_fstr(pinfo->cinfo, COL_INFO, + " (Group=%s, Member=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + group_start, group_len, ENC_UTF_8|ENC_NA), + tvb_get_string_enc(wmem_packet_scope(), tvb, + member_start, member_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_sync_group_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + + /* member_assignment */ + offset = dissect_kafka_bytes(tree, hf_kafka_member_assignment, tvb, pinfo, offset, NULL, NULL); + + return offset; +} + +/* DESCRIBE_GROUPS REQUEST/RESPONSE */ + +static int +dissect_kafka_describe_groups_request_group_id(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + /* group_id */ + offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, NULL, NULL); + + return offset; +} + +static int +dissect_kafka_describe_groups_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + /* [group_id] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_describe_groups_request_group_id); + + return offset; +} + +static int +dissect_kafka_describe_groups_response_member(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int member_start, member_len; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_member, &subti, "Member"); + + /* member_id */ + offset = dissect_kafka_string(subtree, hf_kafka_member_id, tvb, pinfo, offset, + &member_start, &member_len); + + /* client_id */ + offset = dissect_kafka_string(subtree, hf_kafka_client_id, tvb, pinfo, offset, NULL, NULL); + + /* client_host */ + offset = dissect_kafka_string(subtree, hf_kafka_client_host, tvb, pinfo, offset, NULL, NULL); + + /* member_metadata */ + offset = dissect_kafka_bytes(subtree, hf_kafka_member_metadata, tvb, pinfo, offset, NULL, NULL); + + /* member_assignment */ + offset = dissect_kafka_bytes(subtree, hf_kafka_member_assignment, tvb, pinfo, offset, NULL, NULL); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Member-ID=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + member_start, member_len, ENC_UTF_8|ENC_NA)); + return offset; +} + +static int +dissect_kafka_describe_groups_response_group(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + proto_item *subti, *subsubti; + proto_tree *subtree, *subsubtree; + int group_start, group_len; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group, &subti, "Group"); + + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, subtree, offset); + + /* group_id */ + offset = dissect_kafka_string(subtree, hf_kafka_consumer_group, tvb, pinfo, offset, + &group_start, &group_len); + + /* state */ + offset = dissect_kafka_string(subtree, hf_kafka_group_state, tvb, pinfo, offset, NULL, NULL); + + /* protocol_type */ + offset = dissect_kafka_string(subtree, hf_kafka_protocol_type, tvb, pinfo, offset, NULL, NULL); + + /* protocol */ + offset = dissect_kafka_string(subtree, hf_kafka_protocol_name, tvb, pinfo, offset, NULL, NULL); + + /* [member] */ + subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_group_members, + &subsubti, "Members"); + offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, + &dissect_kafka_describe_groups_response_member); + proto_item_set_end(subsubti, tvb, offset); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Group-ID=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + group_start, group_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_describe_groups_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + /* [group] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_describe_groups_response_group); + + return offset; +} + +/* LIST_GROUPS REQUEST/RESPONSE */ + +static int +dissect_kafka_list_groups_request(tvbuff_t *tvb _U_, packet_info *pinfo _U_, proto_tree *tree _U_, int offset, + kafka_api_version_t api_version _U_) +{ + return offset; +} + +static int +dissect_kafka_list_groups_response_group(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int group_start, group_len; + int protocol_type_start, protocol_type_len; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group, &subti, "Group"); + + /* group_id */ + offset = dissect_kafka_string(subtree, hf_kafka_consumer_group, tvb, pinfo, offset, + &group_start, &group_len); + + /* protocol_type */ + offset = dissect_kafka_string(subtree, hf_kafka_protocol_type, tvb, pinfo, offset, + &protocol_type_start, &protocol_type_len); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Group-ID=%s, Protocol-Type=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + group_start, group_len, ENC_UTF_8|ENC_NA), + tvb_get_string_enc(wmem_packet_scope(), tvb, + protocol_type_start, protocol_type_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_list_groups_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + + /* [group] */ + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_list_groups_response_group); + + return offset; +} + +/* SASL_HANDSHAKE REQUEST/RESPONSE */ + +static int +dissect_kafka_sasl_handshake_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + /* mechanism */ + offset = dissect_kafka_string(tree, hf_kafka_sasl_mechanism, tvb, pinfo, offset, NULL, NULL); + + return offset; +} + +static int +dissect_kafka_sasl_handshake_response_enabled_mechanism(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + /* enabled_mechanism */ + offset = dissect_kafka_string(tree, hf_kafka_sasl_mechanism, tvb, pinfo, offset, NULL, NULL); + + return offset; +} + +static int +dissect_kafka_sasl_handshake_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + proto_item *subti; + proto_tree *subtree; + + /* error_code */ + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + + /* [enabled_mechanism] */ + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_sasl_enabled_mechanisms, + &subti, "Enabled SASL Mechanisms"); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_sasl_handshake_response_enabled_mechanism); + proto_item_set_end(subti, tvb, offset); + + return offset; +} + +/* CREATE_TOPICS REQUEST/RESPONSE */ + +static int +dissect_kafka_create_topics_request_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + /* replica */ + proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + return offset; +} + +static int +dissect_kafka_create_topics_request_replica_assignment(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + kafka_partition_t partition; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_replica_assignment, + &subti, "Replica Assignment"); + + /* partition_id */ + partition = (gint32) tvb_get_ntohl(tvb, offset); + proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* [replica] */ + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_create_topics_request_replica); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Partition-ID=%d)", + partition); + + return offset; +} + +static int +dissect_kafka_create_topics_request_config(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int key_start, key_len; + int val_start, val_len; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_config, + &subti, "Config"); + + /* key */ + offset = dissect_kafka_string(subtree, hf_kafka_config_key, tvb, pinfo, offset, &key_start, &key_len); + + /* value */ + offset = dissect_kafka_string(subtree, hf_kafka_config_value, tvb, pinfo, offset, &val_start, &val_len); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Key=%s, Value=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + key_start, key_len, ENC_UTF_8|ENC_NA), + tvb_get_string_enc(wmem_packet_scope(), tvb, + val_start, val_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_create_topics_request_create_topic_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version) +{ + proto_item *subti, *subsubti; + proto_tree *subtree, *subsubtree; + int topic_start, topic_len; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_topic, + &subti, "Create Topic Request"); + + /* topic */ + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len); + + /* num_partitions */ + proto_tree_add_item(subtree, hf_kafka_num_partitions, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + /* replication_factor */ + proto_tree_add_item(subtree, hf_kafka_replication_factor, tvb, offset, 2, ENC_BIG_ENDIAN); + offset += 2; + + /* [replica_assignment] */ + subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, + ett_kafka_replica_assignment, + &subsubti, "Replica Assignments"); + offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, + &dissect_kafka_create_topics_request_replica_assignment); + proto_item_set_end(subsubti, tvb, offset); + + /* [config] */ + subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, + ett_kafka_config, + &subsubti, "Configs"); + offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, + &dissect_kafka_create_topics_request_config); + proto_item_set_end(subsubti, tvb, offset); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Topic=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + topic_start, topic_len, ENC_UTF_8|ENC_NA)); + + return offset; +} + +static int +dissect_kafka_create_topics_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + proto_item *subti; + proto_tree *subtree; + + /* [topic] */ + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_topics, + &subti, "Create Topic Requests"); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_create_topics_request_create_topic_request); + proto_item_set_end(subti, tvb, offset); + + /* timeout */ + proto_tree_add_item(tree, hf_kafka_timeout, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + return offset; +} + +static int +dissect_kafka_create_topics_response_topic_error_code(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int topic_start, topic_len; + kafka_error_t error; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_topic, + &subti, "Topic Error Code"); + + /* topic */ + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len); + + /* error_code */ + offset = dissect_kafka_error_ret(tvb, pinfo, subtree, offset, &error); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Topic=%s, Error=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + topic_start, topic_len, ENC_UTF_8|ENC_NA), + kafka_error_to_str(error)); + + return offset; +} + +static int +dissect_kafka_create_topics_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + proto_item *subti; + proto_tree *subtree; + + /* [topic_error_code] */ + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_topics, + &subti, "Topic Error Codes"); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_create_topics_response_topic_error_code); + proto_item_set_end(subti, tvb, offset); + + return offset; +} + +/* DELETE_TOPICS REQUEST/RESPONSE */ + +static int +dissect_kafka_delete_topics_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + /* topic */ + offset = dissect_kafka_string(tree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); + + return offset; +} + +static int +dissect_kafka_delete_topics_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + proto_item *subti; + proto_tree *subtree; + + /* [topic] */ + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_topics, + &subti, "Topics"); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_delete_topics_request_topic); + proto_item_set_end(subti, tvb, offset); + + /* timeout */ + proto_tree_add_item(tree, hf_kafka_timeout, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + return offset; +} + +static int +dissect_kafka_delete_topics_response_topic_error_code(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int offset, kafka_api_version_t api_version _U_) +{ + proto_item *subti; + proto_tree *subtree; + int topic_start, topic_len; + kafka_error_t error; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_topic, + &subti, "Topic Error Code"); + + /* topic */ + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len); + + /* error_code */ + offset = dissect_kafka_error_ret(tvb, pinfo, subtree, offset, &error); + + proto_item_set_end(subti, tvb, offset); + proto_item_append_text(subti, " (Topic=%s, Error=%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + topic_start, topic_len, ENC_UTF_8|ENC_NA), + kafka_error_to_str(error)); + + return offset; +} + +static int +dissect_kafka_delete_topics_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + proto_item *subti; + proto_tree *subtree; + + /* [topic_error_code] */ + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, + ett_kafka_topics, + &subti, "Topic Error Codes"); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_delete_topics_response_topic_error_code); + proto_item_set_end(subti, tvb, offset); + + return offset; } /* MAIN */ @@ -1334,11 +3029,11 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U } col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s v%d Request", - val_to_str_const(matcher->api_key, kafka_apis, "Unknown"), + kafka_api_key_to_str(matcher->api_key), matcher->api_version); /* Also add to protocol root */ proto_item_append_text(root_ti, " (%s v%d Request)", - val_to_str_const(matcher->api_key, kafka_apis, "Unknown"), + kafka_api_key_to_str(matcher->api_key), matcher->api_version); if (matcher->response_found) { @@ -1359,28 +3054,73 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U offset = dissect_kafka_string(kafka_tree, hf_kafka_client_id, tvb, pinfo, offset, NULL, NULL); switch (matcher->api_key) { - /* TODO: decode other request types */ case KAFKA_PRODUCE: /* Produce requests may need delayed queueing, see the more * detailed comment above. */ - if (tvb_get_ntohs(tvb, offset) != 0 && !PINFO_FD_VISITED(pinfo)) { + if (tvb_get_ntohs(tvb, offset) != KAFKA_ACK_NOT_REQUIRED && !PINFO_FD_VISITED(pinfo)) { wmem_queue_push(match_queue, matcher); } /*offset =*/ dissect_kafka_produce_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; - case KAFKA_OFFSET_FETCH: - /*offset =*/ dissect_kafka_offset_fetch_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); - break; - case KAFKA_METADATA: - /*offset =*/ dissect_kafka_metadata_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); - break; case KAFKA_FETCH: /*offset =*/ dissect_kafka_fetch_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; case KAFKA_OFFSETS: /*offset =*/ dissect_kafka_offsets_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; + case KAFKA_METADATA: + /*offset =*/ dissect_kafka_metadata_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_LEADER_AND_ISR: + /*offset =*/ dissect_kafka_leader_and_isr_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_STOP_REPLICA: + /*offset =*/ dissect_kafka_stop_replica_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_UPDATE_METADATA: + /*offset =*/ dissect_kafka_update_metadata_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_CONTROLLED_SHUTDOWN: + /*offset =*/ dissect_kafka_controlled_shutdown_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_OFFSET_COMMIT: + /*offset =*/ dissect_kafka_offset_commit_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_OFFSET_FETCH: + /*offset =*/ dissect_kafka_offset_fetch_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_GROUP_COORDINATOR: + /*offset =*/ dissect_kafka_group_coordinator_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_JOIN_GROUP: + /*offset =*/ dissect_kafka_join_group_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_HEARTBEAT: + /*offset =*/ dissect_kafka_heartbeat_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_LEAVE_GROUP: + /*offset =*/ dissect_kafka_leave_group_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_SYNC_GROUP: + /*offset =*/ dissect_kafka_sync_group_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_DESCRIBE_GROUPS: + /*offset =*/ dissect_kafka_describe_groups_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_LIST_GROUPS: + /*offset =*/ dissect_kafka_list_groups_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_SASL_HANDSHAKE: + /*offset =*/ dissect_kafka_sasl_handshake_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; case KAFKA_API_VERSIONS: + /*offset =*/ dissect_kafka_api_versions_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_CREATE_TOPICS: + /*offset =*/ dissect_kafka_create_topics_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_DELETE_TOPICS: + /*offset =*/ dissect_kafka_delete_topics_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; } } @@ -1395,8 +3135,8 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U matcher = (kafka_query_response_t *) wmem_queue_peek(match_queue); } if (matcher == NULL || matcher->request_frame >= pinfo->num) { - col_set_str(pinfo->cinfo, COL_INFO, "Kafka Response (Unknown API, Missing Request)"); - /* TODO: expert info, don't have request, can't dissect */ + col_set_str(pinfo->cinfo, COL_INFO, "Kafka Response (Undecoded, Request Missing)"); + expert_add_info(pinfo, root_ti, &ei_kafka_request_missing); return tvb_captured_length(tvb); } @@ -1409,11 +3149,11 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U } col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s v%d Response", - val_to_str_const(matcher->api_key, kafka_apis, "Unknown"), + kafka_api_key_to_str(matcher->api_key), matcher->api_version); /* Also add to protocol root */ proto_item_append_text(root_ti, " (%s v%d Response)", - val_to_str_const(matcher->api_key, kafka_apis, "Unknown"), + kafka_api_key_to_str(matcher->api_key), matcher->api_version); @@ -1434,25 +3174,69 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U switch (matcher->api_key) { - /* TODO: decode other response types */ case KAFKA_PRODUCE: /*offset =*/ dissect_kafka_produce_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; - case KAFKA_OFFSET_FETCH: - /*offset =*/ dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); - break; - case KAFKA_METADATA: - /*offset =*/ dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); - break; case KAFKA_FETCH: /*offset =*/ dissect_kafka_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; case KAFKA_OFFSETS: /*offset =*/ dissect_kafka_offsets_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; + case KAFKA_METADATA: + /*offset =*/ dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_LEADER_AND_ISR: + /*offset =*/ dissect_kafka_leader_and_isr_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_STOP_REPLICA: + /*offset =*/ dissect_kafka_stop_replica_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_UPDATE_METADATA: + /*offset =*/ dissect_kafka_update_metadata_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_CONTROLLED_SHUTDOWN: + /*offset =*/ dissect_kafka_controlled_shutdown_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_OFFSET_COMMIT: + /*offset =*/ dissect_kafka_offset_commit_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_OFFSET_FETCH: + /*offset =*/ dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_GROUP_COORDINATOR: + /*offset =*/ dissect_kafka_group_coordinator_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_JOIN_GROUP: + /*offset =*/ dissect_kafka_join_group_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_HEARTBEAT: + /*offset =*/ dissect_kafka_heartbeat_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_LEAVE_GROUP: + /*offset =*/ dissect_kafka_leave_group_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_SYNC_GROUP: + /*offset =*/ dissect_kafka_sync_group_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_DESCRIBE_GROUPS: + /*offset =*/ dissect_kafka_describe_groups_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_LIST_GROUPS: + /*offset =*/ dissect_kafka_list_groups_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_SASL_HANDSHAKE: + /*offset =*/ dissect_kafka_sasl_handshake_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; case KAFKA_API_VERSIONS: /*offset =*/ dissect_kafka_api_versions_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; + case KAFKA_CREATE_TOPICS: + /*offset =*/ dissect_kafka_create_topics_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_DELETE_TOPICS: + /*offset =*/ dissect_kafka_delete_topics_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; } } @@ -1541,6 +3325,11 @@ proto_register_kafka(void) FT_STRING, BASE_NONE, 0, 0, "The ID of the sending client.", HFILL } }, + { &hf_kafka_client_host, + { "Client Host", "kafka.client_host", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, { &hf_kafka_string_len, { "String Length", "kafka.string_len", FT_INT16, BASE_DEC, 0, 0, @@ -1558,7 +3347,7 @@ proto_register_kafka(void) }, { &hf_kafka_required_acks, { "Required Acks", "kafka.required_acks", - FT_INT16, BASE_DEC, 0, 0, + FT_INT16, BASE_DEC, VALS(kafka_acks), 0, NULL, HFILL } }, { &hf_kafka_timeout, @@ -1581,6 +3370,11 @@ proto_register_kafka(void) FT_INT32, BASE_DEC, 0, 0, NULL, HFILL } }, + { &hf_kafka_replication_factor, + { "Replication Factor", "kafka.replication_factor", + FT_INT16, BASE_DEC, 0, 0, + NULL, HFILL } + }, { &hf_kafka_isr, { "Caught-Up Replica ID", "kafka.isr_id", FT_INT32, BASE_DEC, 0, 0, @@ -1613,7 +3407,12 @@ proto_register_kafka(void) }, { &hf_kafka_message_codec, { "Compression Codec", "kafka.message_codec", - FT_UINT8, BASE_DEC, VALS(kafka_codecs), 0x03, + FT_UINT8, BASE_DEC, VALS(kafka_message_codecs), KAFKA_MESSAGE_CODEC_MASK, + NULL, HFILL } + }, + { &hf_kafka_message_timestamp_type, + { "Timestamp Type", "kafka.message_timestamp_type", + FT_UINT8, BASE_DEC, VALS(kafka_message_timestamp_types), KAFKA_MESSAGE_TIMESTAMP_MASK, NULL, HFILL } }, { &hf_kafka_message_timestamp, @@ -1661,6 +3460,11 @@ proto_register_kafka(void) FT_STRING, BASE_NONE, 0, 0, NULL, HFILL } }, + { &hf_kafka_broker_security_protocol_type, + { "Security Protocol Type", "kafka.broker_security_protocol_type", + FT_INT16, BASE_DEC, VALS(kafka_security_protocol_types), 0, + NULL, HFILL } + }, { &hf_kafka_cluster_id, { "Cluster ID", "kafka.cluster_id", FT_STRING, BASE_NONE, 0, 0, @@ -1671,6 +3475,31 @@ proto_register_kafka(void) FT_INT32, BASE_DEC, 0, 0, NULL, HFILL } }, + { &hf_kafka_controller_epoch, + { "Controller Epoch", "kafka.controller_epoch", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_delete_partitions, + { "Delete Partitions", "kafka.delete_partitions", + FT_BOOLEAN, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_group_leader_id, + { "Leader ID", "kafka.group_leader_id", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_leader_id, + { "Leader ID", "kafka.leader_id", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_leader_epoch, + { "Leader Epoch", "kafka.leader_epoch", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, { &hf_kafka_is_internal, { "Is Internal", "kafka.is_internal", FT_BOOLEAN, BASE_NONE, 0, 0, @@ -1725,29 +3554,132 @@ proto_register_kafka(void) { "Max Version", "kafka.api_versions.max_version", FT_INT16, BASE_DEC, 0, 0, "Maximal version which supports api key.", HFILL } - } + }, + { &hf_kafka_session_timeout, + { "Session Timeout", "kafka.session_timeout", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_rebalance_timeout, + { "Rebalance Timeout", "kafka.rebalance_timeout", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_group_state, + { "State", "kafka.group_state", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_member_id, + { "Consumer Group Member ID", "kafka.member_id", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_protocol_type, + { "Protocol Type", "kafka.protocol_type", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_protocol_name, + { "Protocol Name", "kafka.protocol_name", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_protocol_metadata, + { "Protocol Metadata", "kafka.protocol_metadata", + FT_BYTES, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_member_metadata, + { "Member Metadata", "kafka.member_metadata", + FT_BYTES, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_generation_id, + { "Generation ID", "kafka.generation_id", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_member_assignment, + { "Member Assignment", "kafka.member_assignment", + FT_BYTES, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_sasl_mechanism, + { "SASL Mechanism", "kafka.sasl_mechanism", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_num_partitions, + { "Number of Partitions", "kafka.num_partitions", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_zk_version, + { "Zookeeper Version", "kafka.zk_version", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_config_key, + { "Key", "kafka.config_key", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_config_value, + { "Key", "kafka.config_value", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_commit_timestamp, + { "Timestamp", "kafka.commit_timestamp", + FT_ABSOLUTE_TIME, ABSOLUTE_TIME_UTC, NULL, 0, + NULL, HFILL } + }, + { &hf_kafka_retention_time, + { "Retention Time", "kafka.retention_time", + FT_INT64, BASE_DEC, 0, 0, + NULL, HFILL } + }, }; - static gint *ett[] = { + static int *ett[] = { &ett_kafka, &ett_kafka_message, &ett_kafka_message_set, - &ett_kafka_metadata_isr, - &ett_kafka_metadata_replicas, - &ett_kafka_metadata_broker, - &ett_kafka_metadata_brokers, - &ett_kafka_metadata_topics, + &ett_kafka_isrs, + &ett_kafka_replicas, + &ett_kafka_broker, + &ett_kafka_brokers, + &ett_kafka_broker_end_point, + &ett_kafka_topics, + &ett_kafka_topic, &ett_kafka_request_topic, &ett_kafka_request_partition, &ett_kafka_response_topic, &ett_kafka_response_partition, - &ett_kafka_api_versions + &ett_kafka_api_version, + &ett_kafka_group_protocols, + &ett_kafka_group_protocol, + &ett_kafka_group_members, + &ett_kafka_group_member, + &ett_kafka_group_assignments, + &ett_kafka_group_assignment, + &ett_kafka_group, + &ett_kafka_sasl_enabled_mechanisms, + &ett_kafka_replica_assignment, + &ett_kafka_configs, + &ett_kafka_config, }; static ei_register_info ei[] = { - { &ei_kafka_message_decompress, { "kafka.decompress_failed", PI_UNDECODED, PI_WARN, "Failed to decompress message", EXPFILL }}, - { &ei_kafka_bad_string_length, { "kafka.bad_string_length", PI_MALFORMED, PI_WARN, "Invalid string length field", EXPFILL }}, - { &ei_kafka_bad_bytes_length, { "kafka.bad_bytes_length", PI_MALFORMED, PI_WARN, "Invalid byte length field", EXPFILL }}, + { &ei_kafka_request_missing, + { "kafka.request_missing", PI_UNDECODED, PI_WARN, "Request missing", EXPFILL }}, + { &ei_kafka_message_decompress, + { "kafka.decompress_failed", PI_UNDECODED, PI_WARN, "Failed to decompress message", EXPFILL }}, + { &ei_kafka_bad_string_length, + { "kafka.bad_string_length", PI_MALFORMED, PI_WARN, "Invalid string length field", EXPFILL }}, + { &ei_kafka_bad_bytes_length, + { "kafka.bad_bytes_length", PI_MALFORMED, PI_WARN, "Invalid byte length field", EXPFILL }}, }; expert_module_t* expert_kafka; |