aboutsummaryrefslogtreecommitdiffstats
path: root/epan/dissectors/packet-kafka.c
diff options
context:
space:
mode:
authorDmitry Lazurkin <dilaz03@gmail.com>2016-11-25 22:45:02 +0300
committerMartin Mathieson <martin.r.mathieson@googlemail.com>2016-12-05 21:45:18 +0000
commit548b9febb3c915b3693af0f5e8fe338447ca0f11 (patch)
tree982ecb5e5a5e5f3a9ddc51be0f586b2558741f7e /epan/dissectors/packet-kafka.c
parenta275e3fd0aecd98b5118ca08a3622e8cb1468b2d (diff)
kafka: add dissection for rest of api keys
- support rest of api keys - dissect kafka.required_acks with constants - dissect kafka.message_timestamp_type - add expert info about missing request Change-Id: I3d18936adac6702a61f545385bdec1b75b564bd9 Reviewed-on: https://code.wireshark.org/review/18954 Petri-Dish: Alexis La Goutte <alexis.lagoutte@gmail.com> Tested-by: Petri Dish Buildbot <buildbot-no-reply@wireshark.org> Reviewed-by: Martin Mathieson <martin.r.mathieson@googlemail.com>
Diffstat (limited to 'epan/dissectors/packet-kafka.c')
-rw-r--r--epan/dissectors/packet-kafka.c2190
1 files changed, 2061 insertions, 129 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c
index 589ba1477b..9d8bc3f60e 100644
--- a/epan/dissectors/packet-kafka.c
+++ b/epan/dissectors/packet-kafka.c
@@ -1,5 +1,5 @@
/* packet-kafka.c
- * Routines for Kafka Protocol dissection (version 0.8 and later)
+ * Routines for Kafka Protocol dissection (version 0.8 - 0.10.1.0)
* Copyright 2013, Evan Huus <eapache@gmail.com>
*
* https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
@@ -38,69 +38,109 @@
void proto_register_kafka(void);
void proto_reg_handoff_kafka(void);
-static int proto_kafka = -1;
-static int hf_kafka_len = -1;
-static int hf_kafka_request_api_key = -1;
-static int hf_kafka_response_api_key = -1;
+static int proto_kafka = -1;
+static int hf_kafka_len = -1;
+static int hf_kafka_request_api_key = -1;
+static int hf_kafka_response_api_key = -1;
static int hf_kafka_request_api_version = -1;
static int hf_kafka_response_api_version = -1;
-static int hf_kafka_correlation_id = -1;
-static int hf_kafka_client_id = -1;
-static int hf_kafka_string_len = -1;
-static int hf_kafka_bytes_len = -1;
-static int hf_kafka_array_count = -1;
-static int hf_kafka_required_acks = -1;
-static int hf_kafka_timeout = -1;
-static int hf_kafka_topic_name = -1;
-static int hf_kafka_partition_id = -1;
-static int hf_kafka_replica = -1;
-static int hf_kafka_isr = -1;
-static int hf_kafka_partition_leader = -1;
-static int hf_kafka_message_set_size = -1;
-static int hf_kafka_message_size = -1;
-static int hf_kafka_message_crc = -1;
-static int hf_kafka_message_magic = -1;
-static int hf_kafka_message_codec = -1;
-static int hf_kafka_message_timestamp = -1;
-static int hf_kafka_message_key = -1;
-static int hf_kafka_message_value = -1;
-static int hf_kafka_request_frame = -1;
-static int hf_kafka_response_frame = -1;
-static int hf_kafka_consumer_group = -1;
-static int hf_kafka_offset = -1;
-static int hf_kafka_offset_time = -1;
-static int hf_kafka_max_offsets = -1;
-static int hf_kafka_metadata = -1;
-static int hf_kafka_error = -1;
-static int hf_kafka_broker_nodeid = -1;
-static int hf_kafka_broker_host = -1;
-static int hf_kafka_broker_port = -1;
-static int hf_kafka_broker_rack = -1;
-static int hf_kafka_cluster_id = -1;
-static int hf_kafka_controller_id = -1;
-static int hf_kafka_is_internal = -1;
-static int hf_kafka_min_bytes = -1;
-static int hf_kafka_max_bytes = -1;
-static int hf_kafka_max_wait_time = -1;
-static int hf_kafka_throttle_time = -1;
-static int hf_kafka_api_versions_api_key = -1;
+static int hf_kafka_correlation_id = -1;
+static int hf_kafka_client_id = -1;
+static int hf_kafka_client_host = -1;
+static int hf_kafka_string_len = -1;
+static int hf_kafka_bytes_len = -1;
+static int hf_kafka_array_count = -1;
+static int hf_kafka_required_acks = -1;
+static int hf_kafka_timeout = -1;
+static int hf_kafka_topic_name = -1;
+static int hf_kafka_partition_id = -1;
+static int hf_kafka_replica = -1;
+static int hf_kafka_replication_factor = -1;
+static int hf_kafka_isr = -1;
+static int hf_kafka_partition_leader = -1;
+static int hf_kafka_message_set_size = -1;
+static int hf_kafka_message_size = -1;
+static int hf_kafka_message_crc = -1;
+static int hf_kafka_message_magic = -1;
+static int hf_kafka_message_codec = -1;
+static int hf_kafka_message_timestamp_type = -1;
+static int hf_kafka_message_timestamp = -1;
+static int hf_kafka_message_key = -1;
+static int hf_kafka_message_value = -1;
+static int hf_kafka_request_frame = -1;
+static int hf_kafka_response_frame = -1;
+static int hf_kafka_consumer_group = -1;
+static int hf_kafka_group_state = -1;
+static int hf_kafka_offset = -1;
+static int hf_kafka_offset_time = -1;
+static int hf_kafka_max_offsets = -1;
+static int hf_kafka_metadata = -1;
+static int hf_kafka_error = -1;
+static int hf_kafka_broker_nodeid = -1;
+static int hf_kafka_broker_host = -1;
+static int hf_kafka_broker_port = -1;
+static int hf_kafka_broker_rack = -1;
+static int hf_kafka_broker_security_protocol_type = -1;
+static int hf_kafka_cluster_id = -1;
+static int hf_kafka_controller_id = -1;
+static int hf_kafka_controller_epoch = -1;
+static int hf_kafka_delete_partitions = -1;
+static int hf_kafka_leader_id = -1;
+static int hf_kafka_group_leader_id = -1;
+static int hf_kafka_leader_epoch = -1;
+static int hf_kafka_is_internal = -1;
+static int hf_kafka_min_bytes = -1;
+static int hf_kafka_max_bytes = -1;
+static int hf_kafka_max_wait_time = -1;
+static int hf_kafka_throttle_time = -1;
+static int hf_kafka_api_versions_api_key = -1;
static int hf_kafka_api_versions_min_version = -1;
static int hf_kafka_api_versions_max_version = -1;
-
-static gint ett_kafka = -1;
-static gint ett_kafka_message = -1;
-static gint ett_kafka_message_set = -1;
-static gint ett_kafka_metadata_replicas = -1;
-static gint ett_kafka_metadata_isr = -1;
-static gint ett_kafka_metadata_broker = -1;
-static gint ett_kafka_metadata_brokers = -1;
-static gint ett_kafka_metadata_topics = -1;
-static gint ett_kafka_request_topic = -1;
-static gint ett_kafka_request_partition = -1;
-static gint ett_kafka_response_topic = -1;
-static gint ett_kafka_response_partition = -1;
-static gint ett_kafka_api_versions = -1;
-
+static int hf_kafka_session_timeout = -1;
+static int hf_kafka_rebalance_timeout = -1;
+static int hf_kafka_member_id = -1;
+static int hf_kafka_protocol_type = -1;
+static int hf_kafka_protocol_name = -1;
+static int hf_kafka_protocol_metadata = -1;
+static int hf_kafka_member_metadata = -1;
+static int hf_kafka_generation_id = -1;
+static int hf_kafka_member_assignment = -1;
+static int hf_kafka_sasl_mechanism = -1;
+static int hf_kafka_num_partitions = -1;
+static int hf_kafka_zk_version = -1;
+static int hf_kafka_config_key = -1;
+static int hf_kafka_config_value = -1;
+static int hf_kafka_commit_timestamp = -1;
+static int hf_kafka_retention_time = -1;
+
+static int ett_kafka = -1;
+static int ett_kafka_message = -1;
+static int ett_kafka_message_set = -1;
+static int ett_kafka_replicas = -1;
+static int ett_kafka_isrs = -1;
+static int ett_kafka_broker = -1;
+static int ett_kafka_brokers = -1;
+static int ett_kafka_broker_end_point = -1;
+static int ett_kafka_topics = -1;
+static int ett_kafka_topic = -1;
+static int ett_kafka_request_topic = -1;
+static int ett_kafka_request_partition = -1;
+static int ett_kafka_response_topic = -1;
+static int ett_kafka_response_partition = -1;
+static int ett_kafka_api_version = -1;
+static int ett_kafka_group_protocols = -1;
+static int ett_kafka_group_protocol = -1;
+static int ett_kafka_group_members = -1;
+static int ett_kafka_group_member = -1;
+static int ett_kafka_group_assignments = -1;
+static int ett_kafka_group_assignment = -1;
+static int ett_kafka_group = -1;
+static int ett_kafka_sasl_enabled_mechanisms = -1;
+static int ett_kafka_replica_assignment = -1;
+static int ett_kafka_configs = -1;
+static int ett_kafka_config = -1;
+
+static expert_field ei_kafka_request_missing = EI_INIT;
static expert_field ei_kafka_message_decompress = EI_INIT;
static expert_field ei_kafka_bad_string_length = EI_INIT;
static expert_field ei_kafka_bad_bytes_length = EI_INIT;
@@ -197,28 +237,58 @@ static const value_string kafka_errors[] = {
{ 0, NULL }
};
-#define KAFKA_COMPRESSION_NONE 0
-#define KAFKA_COMPRESSION_GZIP 1
-#define KAFKA_COMPRESSION_SNAPPY 2
-#define KAFKA_COMPRESSION_LZ4 3
-static const value_string kafka_codecs[] = {
- { KAFKA_COMPRESSION_NONE, "None" },
- { KAFKA_COMPRESSION_GZIP, "Gzip" },
- { KAFKA_COMPRESSION_SNAPPY, "Snappy" },
- { KAFKA_COMPRESSION_LZ4, "LZ4" },
+#define KAFKA_ACK_NOT_REQUIRED 0
+#define KAFKA_ACK_LEADER 1
+#define KAFKA_ACK_FULL_ISR -1
+static const value_string kafka_acks[] = {
+ { KAFKA_ACK_NOT_REQUIRED, "Not Required" },
+ { KAFKA_ACK_LEADER, "Leader" },
+ { KAFKA_ACK_FULL_ISR, "Full ISR" },
+ { 0, NULL }
+};
+
+#define KAFKA_MESSAGE_CODEC_MASK 0x07
+#define KAFKA_MESSAGE_CODEC_NONE 0
+#define KAFKA_MESSAGE_CODEC_GZIP 1
+#define KAFKA_MESSAGE_CODEC_SNAPPY 2
+#define KAFKA_MESSAGE_CODEC_LZ4 3
+static const value_string kafka_message_codecs[] = {
+ { KAFKA_MESSAGE_CODEC_NONE, "None" },
+ { KAFKA_MESSAGE_CODEC_GZIP, "Gzip" },
+ { KAFKA_MESSAGE_CODEC_SNAPPY, "Snappy" },
+ { KAFKA_MESSAGE_CODEC_LZ4, "LZ4" },
{ 0, NULL }
};
#ifdef HAVE_SNAPPY
static const guint8 kafka_xerial_header[8] = {0x82, 0x53, 0x4e, 0x41, 0x50, 0x50, 0x59, 0x00};
#endif
+#define KAFKA_MESSAGE_TIMESTAMP_MASK 0x08
+static const value_string kafka_message_timestamp_types[] = {
+ { 0, "CreateTime" },
+ { 1, "LogAppendTime" },
+ { 0, NULL }
+};
+
+static const value_string kafka_security_protocol_types[] = {
+ { 0, "PLAINTEXT" },
+ { 1, "SSL" },
+ { 2, "SASL_PLAINTEXT" },
+ { 3, "SASL_SSL" },
+ { 0, NULL }
+};
+
/* List/range of TCP ports to register */
static range_t *current_kafka_tcp_range = NULL;
-typedef guint16 kafka_api_version_t;
+typedef gint16 kafka_api_key_t;
+typedef gint16 kafka_api_version_t;
+typedef gint16 kafka_error_t;
+typedef gint32 kafka_partition_t;
+typedef gint64 kafka_offset_t;
typedef struct _kafka_query_response_t {
- gint16 api_key;
+ kafka_api_key_t api_key;
kafka_api_version_t api_version;
guint32 request_frame;
guint32 response_frame;
@@ -228,8 +298,8 @@ typedef struct _kafka_query_response_t {
/* Some values to temporarily remember during dissection */
typedef struct kafka_packet_values_t {
- guint32 partition_id;
- gint64 offset;
+ kafka_partition_t partition_id;
+ kafka_offset_t offset;
} kafka_packet_values_t;
/* Forward declaration (dissect_kafka_message_set() and dissect_kafka_message() call each other...) */
@@ -239,6 +309,18 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
/* HELPERS */
+static const char *
+kafka_error_to_str(kafka_error_t error)
+{
+ return val_to_str_const(error, kafka_errors, "Unknown");
+}
+
+static const char *
+kafka_api_key_to_str(kafka_api_key_t api_key)
+{
+ return val_to_str_const(api_key, kafka_apis, "Unknown");
+}
+
static guint
get_kafka_pdu_len(packet_info *pinfo _U_, tvbuff_t *tvb, int offset, void *data _U_)
{
@@ -389,7 +471,10 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
/* Codec */
proto_tree_add_item(subtree, hf_kafka_message_codec, tvb, offset, 1, ENC_BIG_ENDIAN);
- codec = tvb_get_guint8(tvb, offset) & 0x07;
+ codec = tvb_get_guint8(tvb, offset) & KAFKA_MESSAGE_CODEC_MASK;
+
+ /* Timestamp Type */
+ proto_tree_add_item(subtree, hf_kafka_message_timestamp_type, tvb, offset, 1, ENC_BIG_ENDIAN);
offset += 1;
if (magic_byte >= 1) {
@@ -400,7 +485,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
offset = dissect_kafka_bytes(subtree, hf_kafka_message_key, tvb, pinfo, offset, NULL, &bytes_length);
switch (codec) {
- case KAFKA_COMPRESSION_GZIP:
+ case KAFKA_MESSAGE_CODEC_GZIP:
raw = kafka_get_bytes(subtree, tvb, pinfo, offset);
offset += 4;
@@ -424,7 +509,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
proto_tree_add_bytes(subtree, hf_kafka_message_value, tvb, offset, 0, NULL);
}
break;
- case KAFKA_COMPRESSION_SNAPPY:
+ case KAFKA_MESSAGE_CODEC_SNAPPY:
#ifdef HAVE_SNAPPY
raw = kafka_get_bytes(subtree, tvb, pinfo, offset);
offset += 4;
@@ -485,8 +570,8 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
}
break;
#endif
- case KAFKA_COMPRESSION_LZ4:
- case KAFKA_COMPRESSION_NONE:
+ case KAFKA_MESSAGE_CODEC_LZ4:
+ case KAFKA_MESSAGE_CODEC_NONE:
default:
offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length);
@@ -630,20 +715,34 @@ dissect_kafka_offset_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree
return offset;
}
-static int dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+static int
+dissect_kafka_error_ret(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_error_t *ret)
{
- guint16 error = tvb_get_ntohs(tvb, offset);
+ kafka_error_t error = (kafka_error_t) tvb_get_ntohs(tvb, offset);
proto_tree_add_item(tree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
+ offset += 2;
+
/* Show error in Info column */
if (error != 0) {
col_append_fstr(pinfo->cinfo, COL_INFO,
- " [%s] ", val_to_str_const(error, kafka_errors, "Unknown"));
+ " [%s] ", kafka_error_to_str(error));
}
- offset += 2;
+
+ if (ret) {
+ *ret = error;
+ }
+
return offset;
}
static int
+dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+{
+ return dissect_kafka_error_ret(tvb, pinfo, tree, offset, NULL);
+}
+
+static int
dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
int start_offset, kafka_api_version_t api_version)
{
@@ -725,7 +824,7 @@ dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre
int host_start, host_len;
guint32 broker_port;
- subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_metadata_broker, &ti, "Broker");
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_broker, &ti, "Broker");
nodeid = tvb_get_ntohl(tvb, offset);
proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN);
@@ -787,12 +886,12 @@ dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *
offset += 4;
sub_start_offset = offset;
- subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_metadata_replicas, &subti, "Replicas");
+ subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_replicas, &subti, "Replicas");
offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_replica);
proto_item_set_len(subti, offset - sub_start_offset);
sub_start_offset = offset;
- subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_metadata_isr, &subti, "Caught-Up Replicas");
+ subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_isrs, &subti, "Caught-Up Replicas");
offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_isr);
proto_item_set_len(subti, offset - sub_start_offset);
@@ -839,7 +938,7 @@ dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *t
proto_tree *subtree;
int offset = start_offset;
- subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_metadata_brokers, &ti, "Broker Metadata");
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_brokers, &ti, "Broker Metadata");
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_broker);
proto_item_set_len(ti, offset - start_offset);
@@ -853,13 +952,321 @@ dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *t
}
start_offset = offset;
- subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_metadata_topics, &ti, "Topic Metadata");
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_topics, &ti, "Topic Metadata");
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_topic);
proto_item_set_len(ti, offset - start_offset);
return offset;
}
+/* LEADER_AND_ISR REQUEST/RESPONSE */
+
+static int
+dissect_kafka_leader_and_isr_request_isr(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ /* isr */
+ proto_tree_add_item(tree, hf_kafka_isr, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ return offset;
+}
+
+static int
+dissect_kafka_leader_and_isr_request_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ /* replica */
+ proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ return offset;
+}
+
+static int
+dissect_kafka_leader_and_isr_request_partition_state(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version)
+{
+ proto_tree *subtree, *subsubtree;
+ proto_item *subti, *subsubti;
+ int topic_start, topic_len;
+ kafka_partition_t partition;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_request_partition,
+ &subti, "Partition State");
+
+ /* topic */
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset,
+ &topic_start, &topic_len);
+
+ /* partition */
+ partition = (kafka_partition_t) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* controller_epoch */
+ proto_tree_add_item(subtree, hf_kafka_controller_epoch, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* leader */
+ proto_tree_add_item(subtree, hf_kafka_leader_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* leader_epoch */
+ proto_tree_add_item(subtree, hf_kafka_leader_epoch, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* [isr] */
+ subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1,
+ ett_kafka_isrs,
+ &subsubti, "ISRs");
+ offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_leader_and_isr_request_isr);
+ proto_item_set_end(subsubti, tvb, offset);
+
+ /* zk_version */
+ proto_tree_add_item(subtree, hf_kafka_zk_version, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* [replica] */
+ subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1,
+ ett_kafka_replicas,
+ &subsubti, "Replicas");
+ offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_leader_and_isr_request_replica);
+ proto_item_set_end(subsubti, tvb, offset);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Topic=%s, Partition-ID=%u)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ topic_start, topic_len, ENC_UTF_8|ENC_NA),
+ partition);
+
+ return offset;
+}
+
+static int
+dissect_kafka_leader_and_isr_request_live_leader(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ gint32 nodeid;
+ int host_start, host_len;
+ gint32 broker_port;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_broker,
+ &subti, "Live Leader");
+
+ /* id */
+ nodeid = (kafka_partition_t) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* host */
+ offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset, &host_start, &host_len);
+
+ /* port */
+ broker_port = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (node %u: %s:%u)",
+ nodeid,
+ tvb_get_string_enc(wmem_packet_scope(), tvb, host_start, host_len, ENC_UTF_8|ENC_NA),
+ broker_port);
+
+ return offset;
+}
+
+static int
+dissect_kafka_leader_and_isr_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ gint32 controller_id;
+
+ /* controller_id */
+ controller_id = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(tree, hf_kafka_controller_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* controller_epoch */
+ proto_tree_add_item(tree, hf_kafka_controller_epoch, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* [partition_state] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_leader_and_isr_request_partition_state);
+
+ /* [live_leader] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_leader_and_isr_request_live_leader);
+
+ col_append_fstr(pinfo->cinfo, COL_INFO, " (Controller-ID=%d)", controller_id);
+
+ return offset;
+}
+
+static int
+dissect_kafka_leader_and_isr_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int topic_start, topic_len;
+ kafka_partition_t partition;
+ kafka_error_t error;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_response_partition,
+ &subti, "Partition");
+
+ /* topic */
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len);
+
+ /* partition */
+ partition = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* error_code */
+ offset = dissect_kafka_error_ret(tvb, pinfo, subtree, offset, &error);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Topic=%s, Partition-ID=%u, Error=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ topic_start, topic_len, ENC_UTF_8|ENC_NA),
+ partition,
+ kafka_error_to_str(error));
+
+ return offset;
+}
+
+static int
+dissect_kafka_leader_and_isr_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+
+ /* [partition] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_leader_and_isr_response_partition);
+
+ return offset;
+}
+
+/* STOP_REPLICA REQUEST/RESPONSE */
+
+static int
+dissect_kafka_stop_replica_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int topic_start, topic_len;
+ kafka_partition_t partition;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_request_partition,
+ &subti, "Partition");
+
+ /* topic */
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len);
+
+ /* partition */
+ partition = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Topic=%s, Partition-ID=%u)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ topic_start, topic_len, ENC_UTF_8|ENC_NA),
+ partition);
+
+ return offset;
+}
+
+static int
+dissect_kafka_stop_replica_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ gint32 controller_id;
+
+ /* controller_id */
+ controller_id = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(tree, hf_kafka_controller_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* controller_epoch */
+ proto_tree_add_item(tree, hf_kafka_controller_epoch, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* delete_partitions */
+ proto_tree_add_item(tree, hf_kafka_delete_partitions, tvb, offset, 1, ENC_BIG_ENDIAN);
+ offset += 1;
+
+ /* [partition] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_stop_replica_request_partition);
+
+ col_append_fstr(pinfo->cinfo, COL_INFO, " (Controller-ID=%d)", controller_id);
+
+ return offset;
+}
+
+static int
+dissect_kafka_stop_replica_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int topic_start, topic_len;
+ kafka_error_t error;
+ kafka_partition_t partition;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_response_partition,
+ &subti, "Partition");
+
+ /* topic */
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len);
+
+ /* partition */
+ partition = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* error_code */
+ offset = dissect_kafka_error_ret(tvb, pinfo, subtree, offset, &error);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Topic=%s, Partition-ID=%u, Error=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ topic_start, topic_len, ENC_UTF_8|ENC_NA),
+ partition,
+ kafka_error_to_str(error));
+
+ return offset;
+}
+
+static int
+dissect_kafka_stop_replica_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+
+ /* [partition] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_stop_replica_response_partition);
+
+ return offset;
+}
+
/* FETCH REQUEST/RESPONSE */
static int
@@ -1227,16 +1634,23 @@ dissect_kafka_offsets_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tr
/* API_VERSIONS REQUEST/RESPONSE */
static int
-dissect_kafka_api_versions_response_api_versions(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
- int start_offset, kafka_api_version_t api_version _U_)
+dissect_kafka_api_versions_request(tvbuff_t *tvb _U_, packet_info *pinfo _U_, proto_tree *tree _U_,
+ int offset _U_, kafka_api_version_t api_version _U_)
+{
+ return offset;
+}
+
+static int
+dissect_kafka_api_versions_response_api_version(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
+ int start_offset, kafka_api_version_t api_version _U_)
{
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
kafka_api_version_t api_key, min_version, max_version;
- subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_api_versions, &ti,
- "API Versions");
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_api_version, &ti,
+ "API Version");
api_key = tvb_get_ntohs(tvb, offset);
proto_tree_add_item(subtree, hf_kafka_api_versions_api_key, tvb, offset, 2, ENC_BIG_ENDIAN);
@@ -1254,13 +1668,13 @@ dissect_kafka_api_versions_response_api_versions(tvbuff_t *tvb, packet_info *pin
if (max_version != min_version) {
/* Range of versions supported. */
proto_item_append_text(subtree, " %s (v%d-%d)",
- val_to_str_const(api_key, kafka_apis, "unknown"),
+ kafka_api_key_to_str(api_key),
min_version, max_version);
}
else {
/* Only one version. */
proto_item_append_text(subtree, " %s (v%d)",
- val_to_str_const(api_key, kafka_apis, "unknown"),
+ kafka_api_key_to_str(api_key),
min_version);
}
@@ -1271,9 +1685,1290 @@ static int
dissect_kafka_api_versions_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
kafka_api_version_t api_version)
{
+ /* error_code */
offset = dissect_kafka_error(tvb, pinfo, tree, offset);
- return dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
- &dissect_kafka_api_versions_response_api_versions);
+
+ /* [api_version] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_api_versions_response_api_version);
+
+ return offset;
+}
+
+/* UPDATE_METADATA REQUEST/RESPONSE */
+
+static int
+dissect_kafka_update_metadata_request_isr(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ /* isr */
+ proto_tree_add_item(tree, hf_kafka_isr, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ return offset;
+}
+
+static int
+dissect_kafka_update_metadata_request_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ /* replica */
+ proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ return offset;
+}
+
+static int
+dissect_kafka_update_metadata_request_partition_state(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version)
+{
+ proto_tree *subtree, *subsubtree;
+ proto_item *subti, *subsubti;
+ int topic_start, topic_len;
+ kafka_partition_t partition;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_request_partition,
+ &subti, "Partition State");
+
+ /* topic */
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset,
+ &topic_start, &topic_len);
+
+ /* partition */
+ partition = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* controller_epoch */
+ proto_tree_add_item(subtree, hf_kafka_controller_epoch, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* leader */
+ proto_tree_add_item(subtree, hf_kafka_leader_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* leader_epoch */
+ proto_tree_add_item(subtree, hf_kafka_leader_epoch, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* [isr] */
+ subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1,
+ ett_kafka_isrs,
+ &subsubti, "ISRs");
+ offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_update_metadata_request_isr);
+ proto_item_set_end(subsubti, tvb, offset);
+
+ /* zk_version */
+ proto_tree_add_item(subtree, hf_kafka_zk_version, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* [replica] */
+ subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1,
+ ett_kafka_replicas,
+ &subsubti, "Replicas");
+ offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_update_metadata_request_replica);
+ proto_item_set_end(subsubti, tvb, offset);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Topic=%s, Partition-ID=%u)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ topic_start, topic_len, ENC_UTF_8|ENC_NA),
+ partition);
+
+ return offset;
+}
+
+static int
+dissect_kafka_update_metadata_request_end_point(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int host_start, host_len;
+ gint32 broker_port;
+ gint16 security_protocol_type;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_broker_end_point,
+ &subti, "End Point");
+
+ /* port */
+ broker_port = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* host */
+ offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset, &host_start, &host_len);
+
+ /* security_protocol_type */
+ security_protocol_type = (gint16) tvb_get_ntohs(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_broker_security_protocol_type, tvb, offset, 2, ENC_BIG_ENDIAN);
+ offset += 2;
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (%s://%s:%d)",
+ val_to_str_const(security_protocol_type,
+ kafka_security_protocol_types, "UNKNOWN"),
+ tvb_get_string_enc(wmem_packet_scope(), tvb, host_start, host_len,
+ ENC_UTF_8|ENC_NA),
+ broker_port);
+
+ return offset;
+}
+
+static int
+dissect_kafka_update_metadata_request_live_leader(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ gint32 nodeid;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_broker,
+ &subti, "Live Leader");
+
+ /* id */
+ nodeid = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ if (api_version == 0) {
+ int host_start, host_len;
+ gint32 broker_port;
+
+ /* host */
+ offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset, &host_start, &host_len);
+
+ /* port */
+ broker_port = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ proto_item_append_text(subti, " (node %u: %s:%u)",
+ nodeid,
+ tvb_get_string_enc(wmem_packet_scope(), tvb, host_start, host_len,
+ ENC_UTF_8|ENC_NA),
+ broker_port);
+ } else if (api_version >= 1) {
+ /* [end_point] */
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_update_metadata_request_end_point);
+
+ if (api_version >= 2) {
+ /* rack */
+ offset = dissect_kafka_string(subtree, hf_kafka_broker_rack, tvb, pinfo, offset, NULL, NULL);
+ }
+
+ proto_item_append_text(subti, " (node %d)",
+ nodeid);
+ }
+
+ proto_item_set_end(subti, tvb, offset);
+
+ return offset;
+}
+
+static int
+dissect_kafka_update_metadata_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ gint32 controller_id;
+
+ /* controller_id */
+ controller_id = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(tree, hf_kafka_controller_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* controller_epoch */
+ proto_tree_add_item(tree, hf_kafka_controller_epoch, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* [partition_state] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_update_metadata_request_partition_state);
+
+ /* [live_leader] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_update_metadata_request_live_leader);
+
+ col_append_fstr(pinfo->cinfo, COL_INFO, " (Controller-ID=%d)", controller_id);
+
+ return offset;
+}
+
+static int
+dissect_kafka_update_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+
+ return offset;
+}
+
+/* CONTROLLED_SHUTDOWN REQUEST/RESPONSE */
+
+static int
+dissect_kafka_controlled_shutdown_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ gint32 broker_id;
+
+ /* broker_id */
+ broker_id = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(tree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ col_append_fstr(pinfo->cinfo, COL_INFO, " (Broker-ID=%d)", broker_id);
+
+ return offset;
+}
+
+static int
+dissect_kafka_controlled_shutdown_response_partition_remaining(tvbuff_t *tvb, packet_info *pinfo,
+ proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int topic_start, topic_len;
+ kafka_partition_t partition;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &subti,
+ "Partition Remaining");
+
+ /* topic */
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset,
+ &topic_start, &topic_len);
+
+ /* partition */
+ partition = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Topic=%s, Partition-ID=%d)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ topic_start, topic_len, ENC_UTF_8|ENC_NA),
+ partition);
+
+ return offset;
+}
+
+static int
+dissect_kafka_controlled_shutdown_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+
+ /* [partition_remaining] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_controlled_shutdown_response_partition_remaining);
+
+ return offset;
+}
+
+/* OFFSET_COMMIT REQUEST/RESPONSE */
+
+static int
+dissect_kafka_offset_commit_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ kafka_partition_t partition_id;
+ kafka_offset_t partition_offset;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_partition, &subti,
+ "Partition");
+
+ /* partition */
+ partition_id = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* offset */
+ partition_offset = (gint64) tvb_get_ntoh64(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN);
+ offset += 8;
+
+ if (api_version == 1) {
+ /* timestamp */
+ offset = dissect_kafka_timestamp(tvb, pinfo, subtree, hf_kafka_commit_timestamp, offset);
+ }
+
+ /* metadata */
+ offset = dissect_kafka_string(subtree, hf_kafka_metadata, tvb, pinfo, offset, NULL, NULL);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)",
+ partition_id, partition_offset);
+
+ return offset;
+}
+
+static int
+dissect_kafka_offset_commit_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int topic_start, topic_len;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &subti, "Topic");
+
+ /* topic */
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset,
+ &topic_start, &topic_len);
+
+ /* [partition] */
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_offset_commit_request_partition);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Topic=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ topic_start, topic_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_offset_commit_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ int group_start, group_len;
+
+ /* group_id */
+ offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset,
+ &group_start, &group_len);
+
+ if (api_version >= 1) {
+ /* group_generation_id */
+ proto_tree_add_item(tree, hf_kafka_generation_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* member_id */
+ offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset, NULL, NULL);
+
+ if (api_version >= 2) {
+ /* retention_time */
+ proto_tree_add_item(tree, hf_kafka_retention_time, tvb, offset, 8, ENC_BIG_ENDIAN);
+ offset += 8;
+ }
+ }
+
+ /* [topic] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_offset_commit_request_topic);
+
+ col_append_fstr(pinfo->cinfo, COL_INFO,
+ " (Group=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ group_start, group_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_offset_commit_response_partition_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ kafka_partition_t partition;
+ kafka_error_t error;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_partition, &subti,
+ "Partition Response");
+
+ /* partition */
+ partition = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* error_code */
+ offset = dissect_kafka_error_ret(tvb, pinfo, subtree, offset, &error);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Partition-ID=%d, Error=%s)",
+ partition, kafka_error_to_str(error));
+
+ return offset;
+}
+
+static int
+dissect_kafka_offset_commit_response_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int topic_start, topic_len;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &subti, "Response");
+
+ /* topic */
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset,
+ &topic_start, &topic_len);
+
+ /* [partition_response] */
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_offset_commit_response_partition_response);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Topic=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ topic_start, topic_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_offset_commit_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ /* [responses] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_offset_commit_response_response);
+
+ return offset;
+}
+
+/* GROUP_COORDINATOR REQUEST/RESPONSE */
+
+static int
+dissect_kafka_group_coordinator_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ int group_start, group_len;
+
+ /* group_id */
+ offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset,
+ &group_start, &group_len);
+
+ col_append_fstr(pinfo->cinfo, COL_INFO,
+ " (Group=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ group_start, group_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_group_coordinator_response_coordinator(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ gint32 node_id;
+ int host_start, host_len;
+ gint32 port;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_broker, &subti, "Coordinator");
+
+ /* node_id */
+ node_id = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* host */
+ offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset,
+ &host_start, &host_len);
+
+ /* port */
+ port = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (node %u: %s:%u)",
+ node_id,
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ host_start, host_len, ENC_UTF_8|ENC_NA),
+ port);
+
+ return offset;
+}
+
+static int
+dissect_kafka_group_coordinator_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+
+ /* coordinator */
+ offset = dissect_kafka_group_coordinator_response_coordinator(tvb, pinfo, tree, offset, api_version);
+
+ return offset;
+}
+
+/* JOIN_GROUP REQUEST/RESPONSE */
+
+static int
+dissect_kafka_join_group_request_group_protocols(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int protocol_start, protocol_len;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_protocol, &subti,
+ "Group Protocol");
+
+ /* protocol_name */
+ offset = dissect_kafka_string(subtree, hf_kafka_protocol_name, tvb, pinfo, offset,
+ &protocol_start, &protocol_len);
+
+ /* protocol_metadata */
+ offset = dissect_kafka_bytes(subtree, hf_kafka_protocol_metadata, tvb, pinfo, offset, NULL, NULL);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Group-ID=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ protocol_start, protocol_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_join_group_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int group_start, group_len;
+ int member_start, member_len;
+
+ /* group_id */
+ offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset,
+ &group_start, &group_len);
+
+ /* session_timeout */
+ proto_tree_add_item(tree, hf_kafka_session_timeout, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ if (api_version > 0) {
+ /* rebalance_timeout */
+ proto_tree_add_item(tree, hf_kafka_rebalance_timeout, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+ }
+
+ /* member_id */
+ offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset,
+ &member_start, &member_len);
+
+ /* protocol_type */
+ offset = dissect_kafka_string(tree, hf_kafka_protocol_type, tvb, pinfo, offset, NULL, NULL);
+
+ /* [group_protocols] */
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_protocols, &subti,
+ "Group Protocols");
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_join_group_request_group_protocols);
+ proto_item_set_end(subti, tvb, offset);
+
+ col_append_fstr(pinfo->cinfo, COL_INFO,
+ " (Group=%s, Member=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ group_start, group_len, ENC_UTF_8|ENC_NA),
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ member_start, member_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_join_group_response_member(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int member_start, member_len;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_member, &subti, "Member");
+
+ /* member_id */
+ offset = dissect_kafka_string(subtree, hf_kafka_member_id, tvb, pinfo, offset,
+ &member_start, &member_len);
+
+ /* member_metadata */
+ offset = dissect_kafka_bytes(subtree, hf_kafka_member_metadata, tvb, pinfo, offset, NULL, NULL);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Member-ID=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ member_start, member_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_join_group_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int member_start, member_len;
+
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+
+ /* generation_id */
+ proto_tree_add_item(tree, hf_kafka_generation_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* group_protocol */
+ offset = dissect_kafka_string(tree, hf_kafka_protocol_name, tvb, pinfo, offset, NULL, NULL);
+
+ /* leader_id */
+ offset = dissect_kafka_string(tree, hf_kafka_group_leader_id, tvb, pinfo, offset, NULL, NULL);
+
+ /* member_id */
+ offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset,
+ &member_start, &member_len);
+
+ /* [member] */
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_members, &subti, "Members");
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_join_group_response_member);
+ proto_item_set_end(subti, tvb, offset);
+
+ col_append_fstr(pinfo->cinfo, COL_INFO,
+ " (Member=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ member_start, member_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+/* HEARTBEAT REQUEST/RESPONSE */
+
+static int
+dissect_kafka_heartbeat_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ int group_start, group_len;
+ int member_start, member_len;
+
+ /* group_id */
+ offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset,
+ &group_start, &group_len);
+
+ /* group_generation_id */
+ proto_tree_add_item(tree, hf_kafka_generation_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* member_id */
+ offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset,
+ &member_start, &member_len);
+
+ col_append_fstr(pinfo->cinfo, COL_INFO,
+ " (Group=%s, Member=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ group_start, group_len, ENC_UTF_8|ENC_NA),
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ member_start, member_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_heartbeat_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+
+ return offset;
+}
+
+/* LEAVE_GROUP REQUEST/RESPONSE */
+
+static int
+dissect_kafka_leave_group_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ int group_start, group_len;
+ int member_start, member_len;
+
+ /* group_id */
+ offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset,
+ &group_start, &group_len);
+
+ /* member_id */
+ offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset,
+ &member_start, &member_len);
+
+ col_append_fstr(pinfo->cinfo, COL_INFO,
+ " (Group=%s, Member=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ group_start, group_len, ENC_UTF_8|ENC_NA),
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ member_start, member_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_leave_group_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+
+ return offset;
+}
+
+/* SYNC_GROUP REQUEST/RESPONSE */
+
+static int
+dissect_kafka_sync_group_request_group_assignment(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int member_start, member_len;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_assignment, &subti,
+ "Group Assignment");
+
+ /* member_id */
+ offset = dissect_kafka_string(subtree, hf_kafka_member_id, tvb, pinfo, offset,
+ &member_start, &member_len);
+
+ /* member_assigment */
+ offset = dissect_kafka_bytes(subtree, hf_kafka_member_assignment, tvb, pinfo, offset, NULL, NULL);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Member=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ member_start, member_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_sync_group_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int group_start, group_len;
+ int member_start, member_len;
+
+ /* group_id */
+ offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset,
+ &group_start, &group_len);
+
+ /* generation_id */
+ proto_tree_add_item(tree, hf_kafka_generation_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* member_id */
+ offset = dissect_kafka_string(tree, hf_kafka_member_id, tvb, pinfo, offset,
+ &member_start, &member_len);
+
+ /* [group_assignment] */
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_assignments, &subti,
+ "Group Assignments");
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_sync_group_request_group_assignment);
+ proto_item_set_end(subti, tvb, offset);
+
+ col_append_fstr(pinfo->cinfo, COL_INFO,
+ " (Group=%s, Member=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ group_start, group_len, ENC_UTF_8|ENC_NA),
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ member_start, member_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_sync_group_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+
+ /* member_assignment */
+ offset = dissect_kafka_bytes(tree, hf_kafka_member_assignment, tvb, pinfo, offset, NULL, NULL);
+
+ return offset;
+}
+
+/* DESCRIBE_GROUPS REQUEST/RESPONSE */
+
+static int
+dissect_kafka_describe_groups_request_group_id(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ /* group_id */
+ offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, NULL, NULL);
+
+ return offset;
+}
+
+static int
+dissect_kafka_describe_groups_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ /* [group_id] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_describe_groups_request_group_id);
+
+ return offset;
+}
+
+static int
+dissect_kafka_describe_groups_response_member(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int member_start, member_len;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group_member, &subti, "Member");
+
+ /* member_id */
+ offset = dissect_kafka_string(subtree, hf_kafka_member_id, tvb, pinfo, offset,
+ &member_start, &member_len);
+
+ /* client_id */
+ offset = dissect_kafka_string(subtree, hf_kafka_client_id, tvb, pinfo, offset, NULL, NULL);
+
+ /* client_host */
+ offset = dissect_kafka_string(subtree, hf_kafka_client_host, tvb, pinfo, offset, NULL, NULL);
+
+ /* member_metadata */
+ offset = dissect_kafka_bytes(subtree, hf_kafka_member_metadata, tvb, pinfo, offset, NULL, NULL);
+
+ /* member_assignment */
+ offset = dissect_kafka_bytes(subtree, hf_kafka_member_assignment, tvb, pinfo, offset, NULL, NULL);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Member-ID=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ member_start, member_len, ENC_UTF_8|ENC_NA));
+ return offset;
+}
+
+static int
+dissect_kafka_describe_groups_response_group(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ proto_item *subti, *subsubti;
+ proto_tree *subtree, *subsubtree;
+ int group_start, group_len;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group, &subti, "Group");
+
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, subtree, offset);
+
+ /* group_id */
+ offset = dissect_kafka_string(subtree, hf_kafka_consumer_group, tvb, pinfo, offset,
+ &group_start, &group_len);
+
+ /* state */
+ offset = dissect_kafka_string(subtree, hf_kafka_group_state, tvb, pinfo, offset, NULL, NULL);
+
+ /* protocol_type */
+ offset = dissect_kafka_string(subtree, hf_kafka_protocol_type, tvb, pinfo, offset, NULL, NULL);
+
+ /* protocol */
+ offset = dissect_kafka_string(subtree, hf_kafka_protocol_name, tvb, pinfo, offset, NULL, NULL);
+
+ /* [member] */
+ subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_group_members,
+ &subsubti, "Members");
+ offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_describe_groups_response_member);
+ proto_item_set_end(subsubti, tvb, offset);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Group-ID=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ group_start, group_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_describe_groups_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ /* [group] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_describe_groups_response_group);
+
+ return offset;
+}
+
+/* LIST_GROUPS REQUEST/RESPONSE */
+
+static int
+dissect_kafka_list_groups_request(tvbuff_t *tvb _U_, packet_info *pinfo _U_, proto_tree *tree _U_, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ return offset;
+}
+
+static int
+dissect_kafka_list_groups_response_group(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int group_start, group_len;
+ int protocol_type_start, protocol_type_len;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_group, &subti, "Group");
+
+ /* group_id */
+ offset = dissect_kafka_string(subtree, hf_kafka_consumer_group, tvb, pinfo, offset,
+ &group_start, &group_len);
+
+ /* protocol_type */
+ offset = dissect_kafka_string(subtree, hf_kafka_protocol_type, tvb, pinfo, offset,
+ &protocol_type_start, &protocol_type_len);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Group-ID=%s, Protocol-Type=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ group_start, group_len, ENC_UTF_8|ENC_NA),
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ protocol_type_start, protocol_type_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_list_groups_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+
+ /* [group] */
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_list_groups_response_group);
+
+ return offset;
+}
+
+/* SASL_HANDSHAKE REQUEST/RESPONSE */
+
+static int
+dissect_kafka_sasl_handshake_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ /* mechanism */
+ offset = dissect_kafka_string(tree, hf_kafka_sasl_mechanism, tvb, pinfo, offset, NULL, NULL);
+
+ return offset;
+}
+
+static int
+dissect_kafka_sasl_handshake_response_enabled_mechanism(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ /* enabled_mechanism */
+ offset = dissect_kafka_string(tree, hf_kafka_sasl_mechanism, tvb, pinfo, offset, NULL, NULL);
+
+ return offset;
+}
+
+static int
+dissect_kafka_sasl_handshake_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+
+ /* error_code */
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+
+ /* [enabled_mechanism] */
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_sasl_enabled_mechanisms,
+ &subti, "Enabled SASL Mechanisms");
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_sasl_handshake_response_enabled_mechanism);
+ proto_item_set_end(subti, tvb, offset);
+
+ return offset;
+}
+
+/* CREATE_TOPICS REQUEST/RESPONSE */
+
+static int
+dissect_kafka_create_topics_request_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ /* replica */
+ proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ return offset;
+}
+
+static int
+dissect_kafka_create_topics_request_replica_assignment(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ kafka_partition_t partition;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_replica_assignment,
+ &subti, "Replica Assignment");
+
+ /* partition_id */
+ partition = (gint32) tvb_get_ntohl(tvb, offset);
+ proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* [replica] */
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_create_topics_request_replica);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Partition-ID=%d)",
+ partition);
+
+ return offset;
+}
+
+static int
+dissect_kafka_create_topics_request_config(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int key_start, key_len;
+ int val_start, val_len;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_config,
+ &subti, "Config");
+
+ /* key */
+ offset = dissect_kafka_string(subtree, hf_kafka_config_key, tvb, pinfo, offset, &key_start, &key_len);
+
+ /* value */
+ offset = dissect_kafka_string(subtree, hf_kafka_config_value, tvb, pinfo, offset, &val_start, &val_len);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Key=%s, Value=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ key_start, key_len, ENC_UTF_8|ENC_NA),
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ val_start, val_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_create_topics_request_create_topic_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version)
+{
+ proto_item *subti, *subsubti;
+ proto_tree *subtree, *subsubtree;
+ int topic_start, topic_len;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_topic,
+ &subti, "Create Topic Request");
+
+ /* topic */
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len);
+
+ /* num_partitions */
+ proto_tree_add_item(subtree, hf_kafka_num_partitions, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ /* replication_factor */
+ proto_tree_add_item(subtree, hf_kafka_replication_factor, tvb, offset, 2, ENC_BIG_ENDIAN);
+ offset += 2;
+
+ /* [replica_assignment] */
+ subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1,
+ ett_kafka_replica_assignment,
+ &subsubti, "Replica Assignments");
+ offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_create_topics_request_replica_assignment);
+ proto_item_set_end(subsubti, tvb, offset);
+
+ /* [config] */
+ subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1,
+ ett_kafka_config,
+ &subsubti, "Configs");
+ offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_create_topics_request_config);
+ proto_item_set_end(subsubti, tvb, offset);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Topic=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ topic_start, topic_len, ENC_UTF_8|ENC_NA));
+
+ return offset;
+}
+
+static int
+dissect_kafka_create_topics_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+
+ /* [topic] */
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_topics,
+ &subti, "Create Topic Requests");
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_create_topics_request_create_topic_request);
+ proto_item_set_end(subti, tvb, offset);
+
+ /* timeout */
+ proto_tree_add_item(tree, hf_kafka_timeout, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ return offset;
+}
+
+static int
+dissect_kafka_create_topics_response_topic_error_code(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int topic_start, topic_len;
+ kafka_error_t error;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_topic,
+ &subti, "Topic Error Code");
+
+ /* topic */
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len);
+
+ /* error_code */
+ offset = dissect_kafka_error_ret(tvb, pinfo, subtree, offset, &error);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Topic=%s, Error=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ topic_start, topic_len, ENC_UTF_8|ENC_NA),
+ kafka_error_to_str(error));
+
+ return offset;
+}
+
+static int
+dissect_kafka_create_topics_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+
+ /* [topic_error_code] */
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_topics,
+ &subti, "Topic Error Codes");
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_create_topics_response_topic_error_code);
+ proto_item_set_end(subti, tvb, offset);
+
+ return offset;
+}
+
+/* DELETE_TOPICS REQUEST/RESPONSE */
+
+static int
+dissect_kafka_delete_topics_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ /* topic */
+ offset = dissect_kafka_string(tree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
+
+ return offset;
+}
+
+static int
+dissect_kafka_delete_topics_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+
+ /* [topic] */
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_topics,
+ &subti, "Topics");
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_delete_topics_request_topic);
+ proto_item_set_end(subti, tvb, offset);
+
+ /* timeout */
+ proto_tree_add_item(tree, hf_kafka_timeout, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ return offset;
+}
+
+static int
+dissect_kafka_delete_topics_response_topic_error_code(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+ int topic_start, topic_len;
+ kafka_error_t error;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_topic,
+ &subti, "Topic Error Code");
+
+ /* topic */
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &topic_start, &topic_len);
+
+ /* error_code */
+ offset = dissect_kafka_error_ret(tvb, pinfo, subtree, offset, &error);
+
+ proto_item_set_end(subti, tvb, offset);
+ proto_item_append_text(subti, " (Topic=%s, Error=%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ topic_start, topic_len, ENC_UTF_8|ENC_NA),
+ kafka_error_to_str(error));
+
+ return offset;
+}
+
+static int
+dissect_kafka_delete_topics_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ proto_item *subti;
+ proto_tree *subtree;
+
+ /* [topic_error_code] */
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1,
+ ett_kafka_topics,
+ &subti, "Topic Error Codes");
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_delete_topics_response_topic_error_code);
+ proto_item_set_end(subti, tvb, offset);
+
+ return offset;
}
/* MAIN */
@@ -1334,11 +3029,11 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
}
col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s v%d Request",
- val_to_str_const(matcher->api_key, kafka_apis, "Unknown"),
+ kafka_api_key_to_str(matcher->api_key),
matcher->api_version);
/* Also add to protocol root */
proto_item_append_text(root_ti, " (%s v%d Request)",
- val_to_str_const(matcher->api_key, kafka_apis, "Unknown"),
+ kafka_api_key_to_str(matcher->api_key),
matcher->api_version);
if (matcher->response_found) {
@@ -1359,28 +3054,73 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
offset = dissect_kafka_string(kafka_tree, hf_kafka_client_id, tvb, pinfo, offset, NULL, NULL);
switch (matcher->api_key) {
- /* TODO: decode other request types */
case KAFKA_PRODUCE:
/* Produce requests may need delayed queueing, see the more
* detailed comment above. */
- if (tvb_get_ntohs(tvb, offset) != 0 && !PINFO_FD_VISITED(pinfo)) {
+ if (tvb_get_ntohs(tvb, offset) != KAFKA_ACK_NOT_REQUIRED && !PINFO_FD_VISITED(pinfo)) {
wmem_queue_push(match_queue, matcher);
}
/*offset =*/ dissect_kafka_produce_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
- case KAFKA_OFFSET_FETCH:
- /*offset =*/ dissect_kafka_offset_fetch_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
- break;
- case KAFKA_METADATA:
- /*offset =*/ dissect_kafka_metadata_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
- break;
case KAFKA_FETCH:
/*offset =*/ dissect_kafka_fetch_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
case KAFKA_OFFSETS:
/*offset =*/ dissect_kafka_offsets_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
+ case KAFKA_METADATA:
+ /*offset =*/ dissect_kafka_metadata_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_LEADER_AND_ISR:
+ /*offset =*/ dissect_kafka_leader_and_isr_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_STOP_REPLICA:
+ /*offset =*/ dissect_kafka_stop_replica_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_UPDATE_METADATA:
+ /*offset =*/ dissect_kafka_update_metadata_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_CONTROLLED_SHUTDOWN:
+ /*offset =*/ dissect_kafka_controlled_shutdown_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_OFFSET_COMMIT:
+ /*offset =*/ dissect_kafka_offset_commit_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_OFFSET_FETCH:
+ /*offset =*/ dissect_kafka_offset_fetch_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_GROUP_COORDINATOR:
+ /*offset =*/ dissect_kafka_group_coordinator_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_JOIN_GROUP:
+ /*offset =*/ dissect_kafka_join_group_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_HEARTBEAT:
+ /*offset =*/ dissect_kafka_heartbeat_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_LEAVE_GROUP:
+ /*offset =*/ dissect_kafka_leave_group_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_SYNC_GROUP:
+ /*offset =*/ dissect_kafka_sync_group_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_DESCRIBE_GROUPS:
+ /*offset =*/ dissect_kafka_describe_groups_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_LIST_GROUPS:
+ /*offset =*/ dissect_kafka_list_groups_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_SASL_HANDSHAKE:
+ /*offset =*/ dissect_kafka_sasl_handshake_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
case KAFKA_API_VERSIONS:
+ /*offset =*/ dissect_kafka_api_versions_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_CREATE_TOPICS:
+ /*offset =*/ dissect_kafka_create_topics_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_DELETE_TOPICS:
+ /*offset =*/ dissect_kafka_delete_topics_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
}
}
@@ -1395,8 +3135,8 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
matcher = (kafka_query_response_t *) wmem_queue_peek(match_queue);
}
if (matcher == NULL || matcher->request_frame >= pinfo->num) {
- col_set_str(pinfo->cinfo, COL_INFO, "Kafka Response (Unknown API, Missing Request)");
- /* TODO: expert info, don't have request, can't dissect */
+ col_set_str(pinfo->cinfo, COL_INFO, "Kafka Response (Undecoded, Request Missing)");
+ expert_add_info(pinfo, root_ti, &ei_kafka_request_missing);
return tvb_captured_length(tvb);
}
@@ -1409,11 +3149,11 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
}
col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s v%d Response",
- val_to_str_const(matcher->api_key, kafka_apis, "Unknown"),
+ kafka_api_key_to_str(matcher->api_key),
matcher->api_version);
/* Also add to protocol root */
proto_item_append_text(root_ti, " (%s v%d Response)",
- val_to_str_const(matcher->api_key, kafka_apis, "Unknown"),
+ kafka_api_key_to_str(matcher->api_key),
matcher->api_version);
@@ -1434,25 +3174,69 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
switch (matcher->api_key) {
- /* TODO: decode other response types */
case KAFKA_PRODUCE:
/*offset =*/ dissect_kafka_produce_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
- case KAFKA_OFFSET_FETCH:
- /*offset =*/ dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
- break;
- case KAFKA_METADATA:
- /*offset =*/ dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
- break;
case KAFKA_FETCH:
/*offset =*/ dissect_kafka_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
case KAFKA_OFFSETS:
/*offset =*/ dissect_kafka_offsets_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
+ case KAFKA_METADATA:
+ /*offset =*/ dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_LEADER_AND_ISR:
+ /*offset =*/ dissect_kafka_leader_and_isr_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_STOP_REPLICA:
+ /*offset =*/ dissect_kafka_stop_replica_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_UPDATE_METADATA:
+ /*offset =*/ dissect_kafka_update_metadata_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_CONTROLLED_SHUTDOWN:
+ /*offset =*/ dissect_kafka_controlled_shutdown_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_OFFSET_COMMIT:
+ /*offset =*/ dissect_kafka_offset_commit_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_OFFSET_FETCH:
+ /*offset =*/ dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_GROUP_COORDINATOR:
+ /*offset =*/ dissect_kafka_group_coordinator_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_JOIN_GROUP:
+ /*offset =*/ dissect_kafka_join_group_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_HEARTBEAT:
+ /*offset =*/ dissect_kafka_heartbeat_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_LEAVE_GROUP:
+ /*offset =*/ dissect_kafka_leave_group_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_SYNC_GROUP:
+ /*offset =*/ dissect_kafka_sync_group_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_DESCRIBE_GROUPS:
+ /*offset =*/ dissect_kafka_describe_groups_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_LIST_GROUPS:
+ /*offset =*/ dissect_kafka_list_groups_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_SASL_HANDSHAKE:
+ /*offset =*/ dissect_kafka_sasl_handshake_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
case KAFKA_API_VERSIONS:
/*offset =*/ dissect_kafka_api_versions_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
+ case KAFKA_CREATE_TOPICS:
+ /*offset =*/ dissect_kafka_create_topics_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_DELETE_TOPICS:
+ /*offset =*/ dissect_kafka_delete_topics_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
}
}
@@ -1541,6 +3325,11 @@ proto_register_kafka(void)
FT_STRING, BASE_NONE, 0, 0,
"The ID of the sending client.", HFILL }
},
+ { &hf_kafka_client_host,
+ { "Client Host", "kafka.client_host",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
{ &hf_kafka_string_len,
{ "String Length", "kafka.string_len",
FT_INT16, BASE_DEC, 0, 0,
@@ -1558,7 +3347,7 @@ proto_register_kafka(void)
},
{ &hf_kafka_required_acks,
{ "Required Acks", "kafka.required_acks",
- FT_INT16, BASE_DEC, 0, 0,
+ FT_INT16, BASE_DEC, VALS(kafka_acks), 0,
NULL, HFILL }
},
{ &hf_kafka_timeout,
@@ -1581,6 +3370,11 @@ proto_register_kafka(void)
FT_INT32, BASE_DEC, 0, 0,
NULL, HFILL }
},
+ { &hf_kafka_replication_factor,
+ { "Replication Factor", "kafka.replication_factor",
+ FT_INT16, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
{ &hf_kafka_isr,
{ "Caught-Up Replica ID", "kafka.isr_id",
FT_INT32, BASE_DEC, 0, 0,
@@ -1613,7 +3407,12 @@ proto_register_kafka(void)
},
{ &hf_kafka_message_codec,
{ "Compression Codec", "kafka.message_codec",
- FT_UINT8, BASE_DEC, VALS(kafka_codecs), 0x03,
+ FT_UINT8, BASE_DEC, VALS(kafka_message_codecs), KAFKA_MESSAGE_CODEC_MASK,
+ NULL, HFILL }
+ },
+ { &hf_kafka_message_timestamp_type,
+ { "Timestamp Type", "kafka.message_timestamp_type",
+ FT_UINT8, BASE_DEC, VALS(kafka_message_timestamp_types), KAFKA_MESSAGE_TIMESTAMP_MASK,
NULL, HFILL }
},
{ &hf_kafka_message_timestamp,
@@ -1661,6 +3460,11 @@ proto_register_kafka(void)
FT_STRING, BASE_NONE, 0, 0,
NULL, HFILL }
},
+ { &hf_kafka_broker_security_protocol_type,
+ { "Security Protocol Type", "kafka.broker_security_protocol_type",
+ FT_INT16, BASE_DEC, VALS(kafka_security_protocol_types), 0,
+ NULL, HFILL }
+ },
{ &hf_kafka_cluster_id,
{ "Cluster ID", "kafka.cluster_id",
FT_STRING, BASE_NONE, 0, 0,
@@ -1671,6 +3475,31 @@ proto_register_kafka(void)
FT_INT32, BASE_DEC, 0, 0,
NULL, HFILL }
},
+ { &hf_kafka_controller_epoch,
+ { "Controller Epoch", "kafka.controller_epoch",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_delete_partitions,
+ { "Delete Partitions", "kafka.delete_partitions",
+ FT_BOOLEAN, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_group_leader_id,
+ { "Leader ID", "kafka.group_leader_id",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_leader_id,
+ { "Leader ID", "kafka.leader_id",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_leader_epoch,
+ { "Leader Epoch", "kafka.leader_epoch",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
{ &hf_kafka_is_internal,
{ "Is Internal", "kafka.is_internal",
FT_BOOLEAN, BASE_NONE, 0, 0,
@@ -1725,29 +3554,132 @@ proto_register_kafka(void)
{ "Max Version", "kafka.api_versions.max_version",
FT_INT16, BASE_DEC, 0, 0,
"Maximal version which supports api key.", HFILL }
- }
+ },
+ { &hf_kafka_session_timeout,
+ { "Session Timeout", "kafka.session_timeout",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_rebalance_timeout,
+ { "Rebalance Timeout", "kafka.rebalance_timeout",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_group_state,
+ { "State", "kafka.group_state",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_member_id,
+ { "Consumer Group Member ID", "kafka.member_id",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_protocol_type,
+ { "Protocol Type", "kafka.protocol_type",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_protocol_name,
+ { "Protocol Name", "kafka.protocol_name",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_protocol_metadata,
+ { "Protocol Metadata", "kafka.protocol_metadata",
+ FT_BYTES, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_member_metadata,
+ { "Member Metadata", "kafka.member_metadata",
+ FT_BYTES, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_generation_id,
+ { "Generation ID", "kafka.generation_id",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_member_assignment,
+ { "Member Assignment", "kafka.member_assignment",
+ FT_BYTES, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_sasl_mechanism,
+ { "SASL Mechanism", "kafka.sasl_mechanism",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_num_partitions,
+ { "Number of Partitions", "kafka.num_partitions",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_zk_version,
+ { "Zookeeper Version", "kafka.zk_version",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_config_key,
+ { "Key", "kafka.config_key",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_config_value,
+ { "Key", "kafka.config_value",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_commit_timestamp,
+ { "Timestamp", "kafka.commit_timestamp",
+ FT_ABSOLUTE_TIME, ABSOLUTE_TIME_UTC, NULL, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_retention_time,
+ { "Retention Time", "kafka.retention_time",
+ FT_INT64, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
};
- static gint *ett[] = {
+ static int *ett[] = {
&ett_kafka,
&ett_kafka_message,
&ett_kafka_message_set,
- &ett_kafka_metadata_isr,
- &ett_kafka_metadata_replicas,
- &ett_kafka_metadata_broker,
- &ett_kafka_metadata_brokers,
- &ett_kafka_metadata_topics,
+ &ett_kafka_isrs,
+ &ett_kafka_replicas,
+ &ett_kafka_broker,
+ &ett_kafka_brokers,
+ &ett_kafka_broker_end_point,
+ &ett_kafka_topics,
+ &ett_kafka_topic,
&ett_kafka_request_topic,
&ett_kafka_request_partition,
&ett_kafka_response_topic,
&ett_kafka_response_partition,
- &ett_kafka_api_versions
+ &ett_kafka_api_version,
+ &ett_kafka_group_protocols,
+ &ett_kafka_group_protocol,
+ &ett_kafka_group_members,
+ &ett_kafka_group_member,
+ &ett_kafka_group_assignments,
+ &ett_kafka_group_assignment,
+ &ett_kafka_group,
+ &ett_kafka_sasl_enabled_mechanisms,
+ &ett_kafka_replica_assignment,
+ &ett_kafka_configs,
+ &ett_kafka_config,
};
static ei_register_info ei[] = {
- { &ei_kafka_message_decompress, { "kafka.decompress_failed", PI_UNDECODED, PI_WARN, "Failed to decompress message", EXPFILL }},
- { &ei_kafka_bad_string_length, { "kafka.bad_string_length", PI_MALFORMED, PI_WARN, "Invalid string length field", EXPFILL }},
- { &ei_kafka_bad_bytes_length, { "kafka.bad_bytes_length", PI_MALFORMED, PI_WARN, "Invalid byte length field", EXPFILL }},
+ { &ei_kafka_request_missing,
+ { "kafka.request_missing", PI_UNDECODED, PI_WARN, "Request missing", EXPFILL }},
+ { &ei_kafka_message_decompress,
+ { "kafka.decompress_failed", PI_UNDECODED, PI_WARN, "Failed to decompress message", EXPFILL }},
+ { &ei_kafka_bad_string_length,
+ { "kafka.bad_string_length", PI_MALFORMED, PI_WARN, "Invalid string length field", EXPFILL }},
+ { &ei_kafka_bad_bytes_length,
+ { "kafka.bad_bytes_length", PI_MALFORMED, PI_WARN, "Invalid byte length field", EXPFILL }},
};
expert_module_t* expert_kafka;