aboutsummaryrefslogtreecommitdiffstats
path: root/epan/dissectors/packet-kafka.c
diff options
context:
space:
mode:
authorDmitry Lazurkin <dilaz03@gmail.com>2016-11-18 00:19:18 +0300
committerMartin Mathieson <martin.r.mathieson@googlemail.com>2016-11-19 22:11:03 +0000
commit4eb61deedc79c8c446e6a162a1dc8ebea6e7353f (patch)
tree10cfe1a10c801b272415575a8e5e5fdd8811e985 /epan/dissectors/packet-kafka.c
parent231ad4f6ff1dc675947739a107bd639a67b7cd35 (diff)
kafka: Update supported api keys to latest spec
Details: - update supported api keys - add api key ApiVersions - change api key names according to documentation - add pcapng files for supported api keys - add new documentation link - add declaration of lz4 message codec Change-Id: I943dc31144890dcd3dd333981a86754668c2bec4 Reviewed-on: https://code.wireshark.org/review/18861 Reviewed-by: Martin Mathieson <martin.r.mathieson@googlemail.com>
Diffstat (limited to 'epan/dissectors/packet-kafka.c')
-rw-r--r--epan/dissectors/packet-kafka.c492
1 files changed, 360 insertions, 132 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c
index 241f998a06..5bd85abd9d 100644
--- a/epan/dissectors/packet-kafka.c
+++ b/epan/dissectors/packet-kafka.c
@@ -3,6 +3,7 @@
* Copyright 2013, Evan Huus <eapache@gmail.com>
*
* https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
+ * http://kafka.apache.org/protocol.html
*
* Wireshark - Network traffic analyzer
* By Gerald Combs <gerald@wireshark.org>
@@ -61,6 +62,7 @@ static int hf_kafka_message_size = -1;
static int hf_kafka_message_crc = -1;
static int hf_kafka_message_magic = -1;
static int hf_kafka_message_codec = -1;
+static int hf_kafka_message_timestamp = -1;
static int hf_kafka_message_key = -1;
static int hf_kafka_message_value = -1;
static int hf_kafka_request_frame = -1;
@@ -74,10 +76,17 @@ static int hf_kafka_error = -1;
static int hf_kafka_broker_nodeid = -1;
static int hf_kafka_broker_host = -1;
static int hf_kafka_broker_port = -1;
+static int hf_kafka_broker_rack = -1;
+static int hf_kafka_cluster_id = -1;
+static int hf_kafka_controller_id = -1;
+static int hf_kafka_is_internal = -1;
static int hf_kafka_min_bytes = -1;
static int hf_kafka_max_bytes = -1;
static int hf_kafka_max_wait_time = -1;
static int hf_kafka_throttle_time = -1;
+static int hf_kafka_api_versions_api_key = -1;
+static int hf_kafka_api_versions_min_version = -1;
+static int hf_kafka_api_versions_max_version = -1;
static gint ett_kafka = -1;
static gint ett_kafka_message = -1;
@@ -91,49 +100,55 @@ static gint ett_kafka_request_topic = -1;
static gint ett_kafka_request_partition = -1;
static gint ett_kafka_response_topic = -1;
static gint ett_kafka_response_partition = -1;
+static gint ett_kafka_api_versions = -1;
static expert_field ei_kafka_message_decompress = EI_INIT;
static expert_field ei_kafka_bad_string_length = EI_INIT;
static expert_field ei_kafka_bad_bytes_length = EI_INIT;
-
-#define KAFKA_PRODUCE 0
-#define KAFKA_FETCH 1
-#define KAFKA_OFFSET 2
-#define KAFKA_METADATA 3
-/* 4-7 are "non-user facing control APIs" and are not documented */
-#define KAFKA_CONTROL_API_4 4
-#define KAFKA_CONTROL_API_5 5
-#define KAFKA_CONTROL_API_6 6
-#define KAFKA_CONTROL_API_7 7
-
-#define KAFKA_OFFSET_COMMIT 8
-#define KAFKA_OFFSET_FETCH 9
-#define KAFKA_CONSUMER_METADATA 10
-#define KAFKA_GROUP_JOIN 11
-#define KAFKA_HEARTBEAT 12
-#define KAFKA_GROUP_LEAVE 13
-#define KAFKA_GROUP_SYNC 14
-#define KAFKA_GROUPS_DESCRIBE 15
-#define KAFKA_GROUPS_LIST 16
+#define KAFKA_PRODUCE 0
+#define KAFKA_FETCH 1
+#define KAFKA_OFFSETS 2
+#define KAFKA_METADATA 3
+#define KAFKA_LEADER_AND_ISR 4
+#define KAFKA_STOP_REPLICA 5
+#define KAFKA_UPDATE_METADATA 6
+#define KAFKA_CONTROLLED_SHUTDOWN 7
+#define KAFKA_OFFSET_COMMIT 8
+#define KAFKA_OFFSET_FETCH 9
+#define KAFKA_GROUP_COORDINATOR 10
+#define KAFKA_JOIN_GROUP 11
+#define KAFKA_HEARTBEAT 12
+#define KAFKA_LEAVE_GROUP 13
+#define KAFKA_SYNC_GROUP 14
+#define KAFKA_DESCRIBE_GROUPS 15
+#define KAFKA_LIST_GROUPS 16
+#define KAFKA_SASL_HANDSHAKE 17
+#define KAFKA_API_VERSIONS 18
+#define KAFKA_CREATE_TOPICS 19
+#define KAFKA_DELETE_TOPICS 20
static const value_string kafka_apis[] = {
- { KAFKA_PRODUCE, "Produce" },
- { KAFKA_FETCH, "Fetch" },
- { KAFKA_OFFSET, "Offset" },
- { KAFKA_METADATA, "Metadata" },
- { KAFKA_CONTROL_API_4, "Unknown Control API (4)" },
- { KAFKA_CONTROL_API_5, "Unknown Control API (5)" },
- { KAFKA_CONTROL_API_6, "Unknown Control API (6)" },
- { KAFKA_CONTROL_API_7, "Unknown Control API (7)" },
- { KAFKA_OFFSET_COMMIT, "Offset Commit" },
- { KAFKA_OFFSET_FETCH, "Offset Fetch" },
- { KAFKA_CONSUMER_METADATA, "Consumer Metadata" },
- { KAFKA_GROUP_JOIN, "Group Join" },
- { KAFKA_HEARTBEAT, "Heatbeat" },
- { KAFKA_GROUP_LEAVE, "Group Leave" },
- { KAFKA_GROUP_SYNC, "Group Sync" },
- { KAFKA_GROUPS_DESCRIBE, "Groups Describe" },
- { KAFKA_GROUPS_LIST, "Groups List" },
+ { KAFKA_PRODUCE, "Produce" },
+ { KAFKA_FETCH, "Fetch" },
+ { KAFKA_OFFSETS, "Offsets" },
+ { KAFKA_METADATA, "Metadata" },
+ { KAFKA_LEADER_AND_ISR, "LeaderAndIsr" },
+ { KAFKA_STOP_REPLICA, "StopReplica" },
+ { KAFKA_UPDATE_METADATA, "UpdateMetadata" },
+ { KAFKA_CONTROLLED_SHUTDOWN, "ControlledShutdown" },
+ { KAFKA_OFFSET_COMMIT, "OffsetCommit" },
+ { KAFKA_OFFSET_FETCH, "OffsetFetch" },
+ { KAFKA_GROUP_COORDINATOR, "GroupCoordinator" },
+ { KAFKA_JOIN_GROUP, "JoinGroup" },
+ { KAFKA_HEARTBEAT, "Heatbeat" },
+ { KAFKA_LEAVE_GROUP, "LeaveGroup" },
+ { KAFKA_SYNC_GROUP, "SyncGroup" },
+ { KAFKA_DESCRIBE_GROUPS, "DescribeGroups" },
+ { KAFKA_LIST_GROUPS, "ListGroups" },
+ { KAFKA_SASL_HANDSHAKE, "SaslHandshake" },
+ { KAFKA_API_VERSIONS, "ApiVersions" },
+ { KAFKA_CREATE_TOPICS, "CreateTopics" },
+ { KAFKA_DELETE_TOPICS, "DeleteTopics" },
{ 0, NULL }
};
@@ -172,17 +187,26 @@ static const value_string kafka_errors[] = {
{ 32, "Invalid timestamp" },
{ 33, "Unsupported SASL mechanism" },
{ 34, "Illegal SASL state" },
- { 35, "Unuspported version" },
+ { 35, "Unsupported version" },
+ { 36, "Topic already exists" },
+ { 37, "Invalid number of partitions" },
+ { 38, "Invalid replication-factor" },
+ { 39, "Invalid replica assignment" },
+ { 40, "Invalid configuration" },
+ { 41, "Not controller" },
+ { 42, "Invalid request" },
{ 0, NULL }
};
#define KAFKA_COMPRESSION_NONE 0
#define KAFKA_COMPRESSION_GZIP 1
#define KAFKA_COMPRESSION_SNAPPY 2
+#define KAFKA_COMPRESSION_LZ4 3
static const value_string kafka_codecs[] = {
{ KAFKA_COMPRESSION_NONE, "None" },
{ KAFKA_COMPRESSION_GZIP, "Gzip" },
{ KAFKA_COMPRESSION_SNAPPY, "Snappy" },
+ { KAFKA_COMPRESSION_LZ4, "LZ4" },
{ 0, NULL }
};
#ifdef HAVE_SNAPPY
@@ -192,9 +216,11 @@ static const guint8 kafka_xerial_header[8] = {0x82, 0x53, 0x4e, 0x41, 0x50, 0x50
/* List/range of TCP ports to register */
static range_t *current_kafka_tcp_range = NULL;
+typedef guint16 kafka_api_version_t;
+
typedef struct _kafka_query_response_t {
gint16 api_key;
- guint16 api_version;
+ kafka_api_version_t api_version;
guint32 request_frame;
guint32 response_frame;
gboolean response_found;
@@ -222,7 +248,8 @@ get_kafka_pdu_len(packet_info *pinfo _U_, tvbuff_t *tvb, int offset, void *data
static int
dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int offset,
- int(*func)(tvbuff_t*, packet_info*, proto_tree*, int))
+ kafka_api_version_t api_version,
+ int(*func)(tvbuff_t*, packet_info*, proto_tree*, int, kafka_api_version_t))
{
gint32 count, i;
@@ -231,7 +258,7 @@ dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int off
offset += 4;
for (i=0; i<count; i++) {
- offset = func(tvb, pinfo, tree, offset);
+ offset = func(tvb, pinfo, tree, offset, api_version);
}
return offset;
@@ -272,7 +299,7 @@ dissect_kafka_string(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *
static int
dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset,
- int *p_string_offset, int *p_string_len)
+ int *p_bytes_offset, int *p_bytes_len)
{
gint32 len;
proto_item *pi;
@@ -282,7 +309,7 @@ dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *p
pi = proto_tree_add_item(tree, hf_kafka_bytes_len, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
- if (p_string_offset != NULL) *p_string_offset = offset;
+ if (p_bytes_offset != NULL) *p_bytes_offset = offset;
if (len < -1) {
expert_add_info(pinfo, pi, &ei_kafka_bad_bytes_length);
@@ -295,7 +322,7 @@ dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *p
offset += len;
}
- if (p_string_len != NULL) *p_string_len = len;
+ if (p_bytes_len != NULL) *p_bytes_len = len;
return offset;
}
@@ -323,12 +350,29 @@ kafka_get_bytes(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int offset)
}
static int
+dissect_kafka_timestamp(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset)
+{
+ nstime_t nstime;
+ guint64 milliseconds;
+
+ milliseconds = tvb_get_ntoh64(tvb, offset);
+ nstime.secs = milliseconds / 1000;
+ nstime.nsecs = (milliseconds % 1000) * 1000000;
+
+ proto_tree_add_time(tree, hf_item, tvb, offset, 8, &nstime);
+ offset += 8;
+
+ return offset;
+}
+
+static int
dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
{
proto_item *ti, *decrypt_item;
proto_tree *subtree;
tvbuff_t *raw, *payload;
int offset = start_offset;
+ gint8 magic_byte;
guint8 codec;
guint bytes_length = 0;
@@ -340,6 +384,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
offset += 4;
/* Magic */
+ magic_byte = tvb_get_guint8(tvb, offset);
proto_tree_add_item(subtree, hf_kafka_message_magic, tvb, offset, 1, ENC_BIG_ENDIAN);
offset += 1;
@@ -348,6 +393,11 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
codec = tvb_get_guint8(tvb, offset) & 0x07;
offset += 1;
+ if (magic_byte >= 1) {
+ /* Timestamp */
+ offset = dissect_kafka_timestamp(tvb, pinfo, subtree, hf_kafka_message_timestamp, offset);
+ }
+
offset = dissect_kafka_bytes(subtree, hf_kafka_message_key, tvb, pinfo, offset, NULL, &bytes_length);
switch (codec) {
@@ -436,6 +486,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
}
break;
#endif
+ case KAFKA_COMPRESSION_LZ4:
case KAFKA_COMPRESSION_NONE:
default:
offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length);
@@ -495,7 +546,8 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
/* OFFSET FETCH REQUEST/RESPONSE */
static int
-dissect_kafka_partition_id(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+dissect_kafka_partition_id(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
{
proto_tree_add_item(tree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
@@ -528,7 +580,8 @@ dissect_kafka_offset_get_value(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree
}
static int
-dissect_kafka_offset(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+dissect_kafka_offset(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
{
proto_tree_add_item(tree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN);
offset += 8;
@@ -536,9 +589,19 @@ dissect_kafka_offset(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, in
return offset;
}
+static int
+dissect_kafka_offset_time(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
+{
+ proto_tree_add_item(tree, hf_kafka_offset_time, tvb, offset, 8, ENC_BIG_ENDIAN);
+ offset += 8;
+
+ return offset;
+}
static int
-dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -549,7 +612,7 @@ dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, prot
offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
count = (gint32)tvb_get_ntohl(tvb, offset);
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_partition_id);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_partition_id);
proto_item_set_len(ti, offset - start_offset);
proto_item_append_text(ti, " (%u partitions)", count);
@@ -558,15 +621,17 @@ dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, prot
}
static int
-dissect_kafka_offset_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+dissect_kafka_offset_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
{
offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, NULL, NULL);
- offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_request_topic);
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_offset_fetch_request_topic);
return offset;
}
-static int dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+static int dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
{
guint16 error = tvb_get_ntohs(tvb, offset);
proto_tree_add_item(tree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
@@ -580,7 +645,8 @@ static int dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree
}
static int
-dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset)
+dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int start_offset, kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -591,7 +657,7 @@ dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_partition, &ti, "Offset Fetch Response Partition");
offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values);
- offset = dissect_kafka_offset(tvb, pinfo, subtree, offset);
+ offset = dissect_kafka_offset(tvb, pinfo, subtree, offset, api_version);
offset = dissect_kafka_string(subtree, hf_kafka_metadata, tvb, pinfo, offset, NULL, NULL);
@@ -606,7 +672,8 @@ dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo
}
static int
-dissect_kafka_offset_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+dissect_kafka_offset_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -615,7 +682,8 @@ dissect_kafka_offset_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, pro
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "offset fetch response topic");
offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_response_partition);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_offset_fetch_response_partition);
proto_item_set_len(ti, offset - start_offset);
@@ -623,27 +691,33 @@ dissect_kafka_offset_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, pro
}
static int
-dissect_kafka_offset_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+dissect_kafka_offset_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
{
- return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_response_topic);
+ return dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_offset_fetch_response_topic);
}
/* METADATA REQUEST/RESPONSE */
static int
-dissect_kafka_metadata_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+dissect_kafka_metadata_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
{
return dissect_kafka_string(tree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
}
static int
-dissect_kafka_metadata_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+dissect_kafka_metadata_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
{
- return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_metadata_request_topic);
+ return dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_metadata_request_topic);
}
static int
-dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -664,6 +738,10 @@ dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre
proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
+ if (api_version >= 1) {
+ offset = dissect_kafka_string(subtree, hf_kafka_broker_rack, tvb, pinfo, offset, NULL, NULL);
+ }
+
proto_item_append_text(ti, " (node %u: %s:%u)",
nodeid,
tvb_get_string_enc(wmem_packet_scope(), tvb,
@@ -676,21 +754,24 @@ dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre
}
static int
-dissect_kafka_metadata_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+dissect_kafka_metadata_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
{
proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN);
return offset + 4;
}
static int
-dissect_kafka_metadata_isr(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+dissect_kafka_metadata_isr(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
{
proto_tree_add_item(tree, hf_kafka_isr, tvb, offset, 4, ENC_BIG_ENDIAN);
return offset + 4;
}
static int
-dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset)
+dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti, *subti;
proto_tree *subtree, *subsubtree;
@@ -701,19 +782,19 @@ dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tr
offset = dissect_kafka_error(tvb, pinfo, subtree, offset);
- offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset);
+ offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset, api_version);
proto_tree_add_item(subtree, hf_kafka_partition_leader, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
sub_start_offset = offset;
subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_metadata_replicas, &subti, "Replicas");
- offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, &dissect_kafka_metadata_replica);
+ offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_replica);
proto_item_set_len(subti, offset - sub_start_offset);
sub_start_offset = offset;
subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_metadata_isr, &subti, "Caught-Up Replicas");
- offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, &dissect_kafka_metadata_isr);
+ offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_isr);
proto_item_set_len(subti, offset - sub_start_offset);
proto_item_set_len(ti, offset - start_offset);
@@ -722,7 +803,8 @@ dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tr
}
static int
-dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -738,7 +820,12 @@ dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree
tvb_get_string_enc(wmem_packet_scope(), tvb,
name_start, name_length, ENC_UTF_8|ENC_NA));
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_partition);
+ if (api_version >= 1) {
+ proto_tree_add_item(subtree, hf_kafka_is_internal, tvb, offset, 1, ENC_NA);
+ offset += 1;
+ }
+
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_partition);
proto_item_set_len(ti, offset - start_offset);
@@ -746,19 +833,29 @@ dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree
}
static int
-dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_metadata_brokers, &ti, "Broker Metadata");
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_broker);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_broker);
proto_item_set_len(ti, offset - start_offset);
+ if (api_version >= 2) {
+ offset = dissect_kafka_string(tree, hf_kafka_cluster_id, tvb, pinfo, offset, NULL, NULL);
+ }
+
+ if (api_version >= 1) {
+ proto_tree_add_item(tree, hf_kafka_controller_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+ }
+
start_offset = offset;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_metadata_topics, &ti, "Topic Metadata");
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_topic);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_topic);
proto_item_set_len(ti, offset - start_offset);
return offset;
@@ -767,7 +864,8 @@ dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *t
/* FETCH REQUEST/RESPONSE */
static int
-dissect_kafka_fetch_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+dissect_kafka_fetch_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
{
proto_item *ti;
proto_tree *subtree;
@@ -790,7 +888,8 @@ dissect_kafka_fetch_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, pro
}
static int
-dissect_kafka_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+dissect_kafka_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -802,7 +901,8 @@ dissect_kafka_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree
offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &name_start, &name_length);
count = tvb_get_ntohl(tvb, offset);
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_fetch_request_partition);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_fetch_request_partition);
proto_item_set_len(ti, offset - start_offset);
proto_item_append_text(ti, " (%u partitions)", count);
@@ -811,7 +911,8 @@ dissect_kafka_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree
}
static int
-dissect_kafka_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+dissect_kafka_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
{
proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
@@ -822,13 +923,19 @@ dissect_kafka_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
proto_tree_add_item(tree, hf_kafka_min_bytes, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
- offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_fetch_request_topic);
+ if (api_version >= 3) {
+ proto_tree_add_item(tree, hf_kafka_max_bytes, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+ }
+
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_fetch_request_topic);
return offset;
}
static int
-dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset)
+dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version _U_)
{
proto_item *ti;
proto_tree *subtree;
@@ -855,7 +962,8 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, pr
}
static int
-dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -866,7 +974,8 @@ dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree
offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
count = tvb_get_ntohl(tvb, offset);
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_fetch_response_partition);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_fetch_response_partition);
proto_item_set_len(ti, offset - start_offset);
proto_item_append_text(ti, " (%u partitions)", count);
@@ -875,21 +984,23 @@ dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree
}
static int
-dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, guint16 api_version)
+dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
{
- if (api_version > 0) {
+ if (api_version >= 1) {
/* Throttle time */
proto_tree_add_item(tree, hf_kafka_throttle_time, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
}
- return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_fetch_response_topic);
+ return dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_fetch_response_topic);
}
/* PRODUCE REQUEST/RESPONSE */
static int
-dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
{
proto_item *ti;
proto_tree *subtree;
@@ -908,7 +1019,8 @@ dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, p
}
static int
-dissect_kafka_produce_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+dissect_kafka_produce_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -917,7 +1029,8 @@ dissect_kafka_produce_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Produce Request Topic");
offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_produce_request_partition);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_produce_request_partition);
proto_item_set_len(ti, offset - start_offset);
@@ -925,7 +1038,8 @@ dissect_kafka_produce_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre
}
static int
-dissect_kafka_produce_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+dissect_kafka_produce_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
{
proto_tree_add_item(tree, hf_kafka_required_acks, tvb, offset, 2, ENC_BIG_ENDIAN);
offset += 2;
@@ -933,13 +1047,15 @@ dissect_kafka_produce_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre
proto_tree_add_item(tree, hf_kafka_timeout, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
- offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_produce_request_topic);
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_produce_request_topic);
return offset;
}
static int
-dissect_kafka_produce_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+dissect_kafka_produce_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version _U_)
{
proto_item *ti;
proto_tree *subtree;
@@ -954,6 +1070,10 @@ dissect_kafka_produce_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_,
offset = dissect_kafka_offset_get_value(tvb, pinfo, subtree, offset, &packet_values);
+ if (api_version >= 2) {
+ offset = dissect_kafka_offset_time(tvb, pinfo, subtree, offset, api_version);
+ }
+
proto_item_append_text(ti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)",
packet_values.partition_id, packet_values.offset);
@@ -961,7 +1081,8 @@ dissect_kafka_produce_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_,
}
static int
-dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -970,7 +1091,8 @@ dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tr
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Produce Response Topic");
offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_produce_response_partition);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_produce_response_partition);
proto_item_set_len(ti, offset - start_offset);
@@ -978,11 +1100,12 @@ dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tr
}
static int
-dissect_kafka_produce_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, guint16 api_version)
+dissect_kafka_produce_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
{
- offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_produce_response_topic);
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_produce_response_topic);
- if (api_version > 0) {
+ if (api_version >= 1) {
/* Throttle time */
proto_tree_add_item(tree, hf_kafka_throttle_time, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
@@ -991,29 +1114,35 @@ dissect_kafka_produce_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tr
return offset;
}
-/* OFFSET REQUEST/RESPONSE */
+/* OFFSETS REQUEST/RESPONSE */
static int
-dissect_kafka_offset_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+dissect_kafka_offsets_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int start_offset, kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
+ int offset = start_offset;
- subtree = proto_tree_add_subtree(tree, tvb, offset, 16, ett_kafka_request_partition, &ti, "Offset Request Partition");
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_partition, &ti, "Offset Request Partition");
- offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset);
+ offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset, api_version);
- proto_tree_add_item(subtree, hf_kafka_offset_time, tvb, offset, 8, ENC_BIG_ENDIAN);
- offset += 8;
+ offset = dissect_kafka_offset_time(tvb, pinfo, subtree, offset, api_version);
- proto_tree_add_item(subtree, hf_kafka_max_offsets, tvb, offset, 4, ENC_BIG_ENDIAN);
- offset += 4;
+ if (api_version == 0) {
+ proto_tree_add_item(subtree, hf_kafka_max_offsets, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+ }
+
+ proto_item_set_len(ti, offset - start_offset);
return offset;
}
static int
-dissect_kafka_offset_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+dissect_kafka_offsets_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -1022,7 +1151,8 @@ dissect_kafka_offset_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Offset Request Topic");
offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_request_partition);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_offsets_request_partition);
proto_item_set_len(ti, offset - start_offset);
@@ -1030,18 +1160,20 @@ dissect_kafka_offset_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree
}
static int
-dissect_kafka_offset_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+dissect_kafka_offsets_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
{
proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
- offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_request_topic);
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_offsets_request_topic);
return offset;
}
static int
-dissect_kafka_offset_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset)
+dissect_kafka_offsets_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree,
+ int start_offset, kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -1049,11 +1181,18 @@ dissect_kafka_offset_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, p
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_partition, &ti, "Offset Response Partition");
- offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset);
+ offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset, api_version);
offset = dissect_kafka_error(tvb, pinfo, subtree, offset);
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset);
+ if (api_version == 0) {
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_offset);
+ }
+ else if (api_version >= 1) {
+ offset = dissect_kafka_offset_time(tvb, pinfo, subtree, offset, api_version);
+
+ offset = dissect_kafka_offset(tvb, pinfo, subtree, offset, api_version);
+ }
proto_item_set_len(ti, offset - start_offset);
@@ -1061,7 +1200,8 @@ dissect_kafka_offset_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, p
}
static int
-dissect_kafka_offset_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+dissect_kafka_offsets_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset,
+ kafka_api_version_t api_version)
{
proto_item *ti;
proto_tree *subtree;
@@ -1070,7 +1210,42 @@ dissect_kafka_offset_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Offset Response Topic");
offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_response_partition);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_offsets_response_partition);
+
+ proto_item_set_len(ti, offset - start_offset);
+
+ return offset;
+}
+
+static int
+dissect_kafka_offsets_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
+{
+ return dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_offsets_response_topic);
+}
+
+/* API_VERSIONS REQUEST/RESPONSE */
+
+static int
+dissect_kafka_api_versions_response_api_versions(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
+ int start_offset, kafka_api_version_t api_version _U_)
+{
+ proto_item *ti;
+ proto_tree *subtree;
+ int offset = start_offset;
+
+ subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_api_versions, &ti,
+ "API Versions");
+
+ proto_tree_add_item(subtree, hf_kafka_api_versions_api_key, tvb, offset, 2, ENC_BIG_ENDIAN);
+ offset += 2;
+
+ proto_tree_add_item(subtree, hf_kafka_api_versions_min_version, tvb, offset, 2, ENC_BIG_ENDIAN);
+ offset += 2;
+
+ proto_tree_add_item(subtree, hf_kafka_api_versions_max_version, tvb, offset, 2, ENC_BIG_ENDIAN);
+ offset += 2;
proto_item_set_len(ti, offset - start_offset);
@@ -1078,9 +1253,12 @@ dissect_kafka_offset_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre
}
static int
-dissect_kafka_offset_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+dissect_kafka_api_versions_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset,
+ kafka_api_version_t api_version)
{
- return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_response_topic);
+ offset = dissect_kafka_error(tvb, pinfo, tree, offset);
+ return dissect_kafka_array(tree, tvb, pinfo, offset, api_version,
+ &dissect_kafka_api_versions_response_api_versions);
}
/* MAIN */
@@ -1140,11 +1318,13 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
}
}
- col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s Request",
- val_to_str_const(matcher->api_key, kafka_apis, "Unknown"));
+ col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s v%d Request",
+ val_to_str_const(matcher->api_key, kafka_apis, "Unknown"),
+ matcher->api_version);
/* Also add to protocol root */
- proto_item_append_text(root_ti, " (%s Request)",
- val_to_str_const(matcher->api_key, kafka_apis, "Unknown"));
+ proto_item_append_text(root_ti, " (%s v%d Request)",
+ val_to_str_const(matcher->api_key, kafka_apis, "Unknown"),
+ matcher->api_version);
if (matcher->response_found) {
ti = proto_tree_add_uint(kafka_tree, hf_kafka_response_frame, tvb,
@@ -1171,19 +1351,21 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
if (tvb_get_ntohs(tvb, offset) != 0 && !PINFO_FD_VISITED(pinfo)) {
wmem_queue_push(match_queue, matcher);
}
- /*offset =*/ dissect_kafka_produce_request(tvb, pinfo, kafka_tree, offset);
+ /*offset =*/ dissect_kafka_produce_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
case KAFKA_OFFSET_FETCH:
- /*offset =*/ dissect_kafka_offset_fetch_request(tvb, pinfo, kafka_tree, offset);
+ /*offset =*/ dissect_kafka_offset_fetch_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
case KAFKA_METADATA:
- /*offset =*/ dissect_kafka_metadata_request(tvb, pinfo, kafka_tree, offset);
+ /*offset =*/ dissect_kafka_metadata_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
case KAFKA_FETCH:
- /*offset =*/ dissect_kafka_fetch_request(tvb, pinfo, kafka_tree, offset);
+ /*offset =*/ dissect_kafka_fetch_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_OFFSETS:
+ /*offset =*/ dissect_kafka_offsets_request(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
- case KAFKA_OFFSET:
- /*offset =*/ dissect_kafka_offset_request(tvb, pinfo, kafka_tree, offset);
+ case KAFKA_API_VERSIONS:
break;
}
}
@@ -1211,11 +1393,13 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
p_add_proto_data(wmem_file_scope(), pinfo, proto_kafka, 0, matcher);
}
- col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s Response",
- val_to_str_const(matcher->api_key, kafka_apis, "Unknown"));
+ col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s v%d Response",
+ val_to_str_const(matcher->api_key, kafka_apis, "Unknown"),
+ matcher->api_version);
/* Also add to protocol root */
- proto_item_append_text(root_ti, " (%s Response)",
- val_to_str_const(matcher->api_key, kafka_apis, "Unknown"));
+ proto_item_append_text(root_ti, " (%s v%d Response)",
+ val_to_str_const(matcher->api_key, kafka_apis, "Unknown"),
+ matcher->api_version);
/* Show request frame */
@@ -1240,16 +1424,19 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
/*offset =*/ dissect_kafka_produce_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
case KAFKA_OFFSET_FETCH:
- /*offset =*/ dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset);
+ /*offset =*/ dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
case KAFKA_METADATA:
- /*offset =*/ dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset);
+ /*offset =*/ dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
case KAFKA_FETCH:
/*offset =*/ dissect_kafka_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
- case KAFKA_OFFSET:
- /*offset =*/ dissect_kafka_offset_response(tvb, pinfo, kafka_tree, offset);
+ case KAFKA_OFFSETS:
+ /*offset =*/ dissect_kafka_offsets_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
+ break;
+ case KAFKA_API_VERSIONS:
+ /*offset =*/ dissect_kafka_api_versions_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
}
@@ -1415,6 +1602,11 @@ proto_register_kafka(void)
FT_UINT8, BASE_DEC, VALS(kafka_codecs), 0x03,
NULL, HFILL }
},
+ { &hf_kafka_message_timestamp,
+ { "Timestamp", "kafka.message_timestamp",
+ FT_ABSOLUTE_TIME, ABSOLUTE_TIME_UTC, NULL, 0,
+ NULL, HFILL }
+ },
{ &hf_kafka_message_key,
{ "Key", "kafka.message_key",
FT_BYTES, BASE_NONE, 0, 0,
@@ -1450,6 +1642,26 @@ proto_register_kafka(void)
FT_INT32, BASE_DEC, 0, 0,
NULL, HFILL }
},
+ { &hf_kafka_broker_rack,
+ { "Rack", "kafka.rack",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_cluster_id,
+ { "Cluster ID", "kafka.cluster_id",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_controller_id,
+ { "Controller ID", "kafka.node_id",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_is_internal,
+ { "Is Internal", "kafka.is_internal",
+ FT_BOOLEAN, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
{ &hf_kafka_min_bytes,
{ "Min Bytes", "kafka.min_bytes",
FT_INT32, BASE_DEC, 0, 0,
@@ -1484,6 +1696,21 @@ proto_register_kafka(void)
{ "Response Frame", "kafka.reponse_frame",
FT_FRAMENUM, BASE_NONE, FRAMENUM_TYPE(FT_FRAMENUM_RESPONSE), 0,
NULL, HFILL }
+ },
+ { &hf_kafka_api_versions_api_key,
+ { "API Key", "kafka.api_versions.api_key",
+ FT_INT16, BASE_DEC, VALS(kafka_apis), 0,
+ "API Key.", HFILL }
+ },
+ { &hf_kafka_api_versions_min_version,
+ { "Min Version", "kafka.api_versions.min_version",
+ FT_INT16, BASE_DEC, 0, 0,
+ "Minimal version which supports api key.", HFILL }
+ },
+ { &hf_kafka_api_versions_max_version,
+ { "Max Version", "kafka.api_versions.max_version",
+ FT_INT16, BASE_DEC, 0, 0,
+ "Maximal version which supports api key.", HFILL }
}
};
@@ -1499,7 +1726,8 @@ proto_register_kafka(void)
&ett_kafka_request_topic,
&ett_kafka_request_partition,
&ett_kafka_response_topic,
- &ett_kafka_response_partition
+ &ett_kafka_response_partition,
+ &ett_kafka_api_versions
};
static ei_register_info ei[] = {