diff options
author | Dmitry Lazurkin <dilaz03@gmail.com> | 2016-11-18 00:19:18 +0300 |
---|---|---|
committer | Martin Mathieson <martin.r.mathieson@googlemail.com> | 2016-11-19 22:11:03 +0000 |
commit | 4eb61deedc79c8c446e6a162a1dc8ebea6e7353f (patch) | |
tree | 10cfe1a10c801b272415575a8e5e5fdd8811e985 /epan/dissectors/packet-kafka.c | |
parent | 231ad4f6ff1dc675947739a107bd639a67b7cd35 (diff) |
kafka: Update supported api keys to latest spec
Details:
- update supported api keys
- add api key ApiVersions
- change api key names according to documentation
- add pcapng files for supported api keys
- add new documentation link
- add declaration of lz4 message codec
Change-Id: I943dc31144890dcd3dd333981a86754668c2bec4
Reviewed-on: https://code.wireshark.org/review/18861
Reviewed-by: Martin Mathieson <martin.r.mathieson@googlemail.com>
Diffstat (limited to 'epan/dissectors/packet-kafka.c')
-rw-r--r-- | epan/dissectors/packet-kafka.c | 492 |
1 files changed, 360 insertions, 132 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c index 241f998a06..5bd85abd9d 100644 --- a/epan/dissectors/packet-kafka.c +++ b/epan/dissectors/packet-kafka.c @@ -3,6 +3,7 @@ * Copyright 2013, Evan Huus <eapache@gmail.com> * * https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol + * http://kafka.apache.org/protocol.html * * Wireshark - Network traffic analyzer * By Gerald Combs <gerald@wireshark.org> @@ -61,6 +62,7 @@ static int hf_kafka_message_size = -1; static int hf_kafka_message_crc = -1; static int hf_kafka_message_magic = -1; static int hf_kafka_message_codec = -1; +static int hf_kafka_message_timestamp = -1; static int hf_kafka_message_key = -1; static int hf_kafka_message_value = -1; static int hf_kafka_request_frame = -1; @@ -74,10 +76,17 @@ static int hf_kafka_error = -1; static int hf_kafka_broker_nodeid = -1; static int hf_kafka_broker_host = -1; static int hf_kafka_broker_port = -1; +static int hf_kafka_broker_rack = -1; +static int hf_kafka_cluster_id = -1; +static int hf_kafka_controller_id = -1; +static int hf_kafka_is_internal = -1; static int hf_kafka_min_bytes = -1; static int hf_kafka_max_bytes = -1; static int hf_kafka_max_wait_time = -1; static int hf_kafka_throttle_time = -1; +static int hf_kafka_api_versions_api_key = -1; +static int hf_kafka_api_versions_min_version = -1; +static int hf_kafka_api_versions_max_version = -1; static gint ett_kafka = -1; static gint ett_kafka_message = -1; @@ -91,49 +100,55 @@ static gint ett_kafka_request_topic = -1; static gint ett_kafka_request_partition = -1; static gint ett_kafka_response_topic = -1; static gint ett_kafka_response_partition = -1; +static gint ett_kafka_api_versions = -1; static expert_field ei_kafka_message_decompress = EI_INIT; static expert_field ei_kafka_bad_string_length = EI_INIT; static expert_field ei_kafka_bad_bytes_length = EI_INIT; - -#define KAFKA_PRODUCE 0 -#define KAFKA_FETCH 1 -#define KAFKA_OFFSET 2 -#define KAFKA_METADATA 3 -/* 4-7 are "non-user facing control APIs" and are not documented */ -#define KAFKA_CONTROL_API_4 4 -#define KAFKA_CONTROL_API_5 5 -#define KAFKA_CONTROL_API_6 6 -#define KAFKA_CONTROL_API_7 7 - -#define KAFKA_OFFSET_COMMIT 8 -#define KAFKA_OFFSET_FETCH 9 -#define KAFKA_CONSUMER_METADATA 10 -#define KAFKA_GROUP_JOIN 11 -#define KAFKA_HEARTBEAT 12 -#define KAFKA_GROUP_LEAVE 13 -#define KAFKA_GROUP_SYNC 14 -#define KAFKA_GROUPS_DESCRIBE 15 -#define KAFKA_GROUPS_LIST 16 +#define KAFKA_PRODUCE 0 +#define KAFKA_FETCH 1 +#define KAFKA_OFFSETS 2 +#define KAFKA_METADATA 3 +#define KAFKA_LEADER_AND_ISR 4 +#define KAFKA_STOP_REPLICA 5 +#define KAFKA_UPDATE_METADATA 6 +#define KAFKA_CONTROLLED_SHUTDOWN 7 +#define KAFKA_OFFSET_COMMIT 8 +#define KAFKA_OFFSET_FETCH 9 +#define KAFKA_GROUP_COORDINATOR 10 +#define KAFKA_JOIN_GROUP 11 +#define KAFKA_HEARTBEAT 12 +#define KAFKA_LEAVE_GROUP 13 +#define KAFKA_SYNC_GROUP 14 +#define KAFKA_DESCRIBE_GROUPS 15 +#define KAFKA_LIST_GROUPS 16 +#define KAFKA_SASL_HANDSHAKE 17 +#define KAFKA_API_VERSIONS 18 +#define KAFKA_CREATE_TOPICS 19 +#define KAFKA_DELETE_TOPICS 20 static const value_string kafka_apis[] = { - { KAFKA_PRODUCE, "Produce" }, - { KAFKA_FETCH, "Fetch" }, - { KAFKA_OFFSET, "Offset" }, - { KAFKA_METADATA, "Metadata" }, - { KAFKA_CONTROL_API_4, "Unknown Control API (4)" }, - { KAFKA_CONTROL_API_5, "Unknown Control API (5)" }, - { KAFKA_CONTROL_API_6, "Unknown Control API (6)" }, - { KAFKA_CONTROL_API_7, "Unknown Control API (7)" }, - { KAFKA_OFFSET_COMMIT, "Offset Commit" }, - { KAFKA_OFFSET_FETCH, "Offset Fetch" }, - { KAFKA_CONSUMER_METADATA, "Consumer Metadata" }, - { KAFKA_GROUP_JOIN, "Group Join" }, - { KAFKA_HEARTBEAT, "Heatbeat" }, - { KAFKA_GROUP_LEAVE, "Group Leave" }, - { KAFKA_GROUP_SYNC, "Group Sync" }, - { KAFKA_GROUPS_DESCRIBE, "Groups Describe" }, - { KAFKA_GROUPS_LIST, "Groups List" }, + { KAFKA_PRODUCE, "Produce" }, + { KAFKA_FETCH, "Fetch" }, + { KAFKA_OFFSETS, "Offsets" }, + { KAFKA_METADATA, "Metadata" }, + { KAFKA_LEADER_AND_ISR, "LeaderAndIsr" }, + { KAFKA_STOP_REPLICA, "StopReplica" }, + { KAFKA_UPDATE_METADATA, "UpdateMetadata" }, + { KAFKA_CONTROLLED_SHUTDOWN, "ControlledShutdown" }, + { KAFKA_OFFSET_COMMIT, "OffsetCommit" }, + { KAFKA_OFFSET_FETCH, "OffsetFetch" }, + { KAFKA_GROUP_COORDINATOR, "GroupCoordinator" }, + { KAFKA_JOIN_GROUP, "JoinGroup" }, + { KAFKA_HEARTBEAT, "Heatbeat" }, + { KAFKA_LEAVE_GROUP, "LeaveGroup" }, + { KAFKA_SYNC_GROUP, "SyncGroup" }, + { KAFKA_DESCRIBE_GROUPS, "DescribeGroups" }, + { KAFKA_LIST_GROUPS, "ListGroups" }, + { KAFKA_SASL_HANDSHAKE, "SaslHandshake" }, + { KAFKA_API_VERSIONS, "ApiVersions" }, + { KAFKA_CREATE_TOPICS, "CreateTopics" }, + { KAFKA_DELETE_TOPICS, "DeleteTopics" }, { 0, NULL } }; @@ -172,17 +187,26 @@ static const value_string kafka_errors[] = { { 32, "Invalid timestamp" }, { 33, "Unsupported SASL mechanism" }, { 34, "Illegal SASL state" }, - { 35, "Unuspported version" }, + { 35, "Unsupported version" }, + { 36, "Topic already exists" }, + { 37, "Invalid number of partitions" }, + { 38, "Invalid replication-factor" }, + { 39, "Invalid replica assignment" }, + { 40, "Invalid configuration" }, + { 41, "Not controller" }, + { 42, "Invalid request" }, { 0, NULL } }; #define KAFKA_COMPRESSION_NONE 0 #define KAFKA_COMPRESSION_GZIP 1 #define KAFKA_COMPRESSION_SNAPPY 2 +#define KAFKA_COMPRESSION_LZ4 3 static const value_string kafka_codecs[] = { { KAFKA_COMPRESSION_NONE, "None" }, { KAFKA_COMPRESSION_GZIP, "Gzip" }, { KAFKA_COMPRESSION_SNAPPY, "Snappy" }, + { KAFKA_COMPRESSION_LZ4, "LZ4" }, { 0, NULL } }; #ifdef HAVE_SNAPPY @@ -192,9 +216,11 @@ static const guint8 kafka_xerial_header[8] = {0x82, 0x53, 0x4e, 0x41, 0x50, 0x50 /* List/range of TCP ports to register */ static range_t *current_kafka_tcp_range = NULL; +typedef guint16 kafka_api_version_t; + typedef struct _kafka_query_response_t { gint16 api_key; - guint16 api_version; + kafka_api_version_t api_version; guint32 request_frame; guint32 response_frame; gboolean response_found; @@ -222,7 +248,8 @@ get_kafka_pdu_len(packet_info *pinfo _U_, tvbuff_t *tvb, int offset, void *data static int dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int offset, - int(*func)(tvbuff_t*, packet_info*, proto_tree*, int)) + kafka_api_version_t api_version, + int(*func)(tvbuff_t*, packet_info*, proto_tree*, int, kafka_api_version_t)) { gint32 count, i; @@ -231,7 +258,7 @@ dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int off offset += 4; for (i=0; i<count; i++) { - offset = func(tvb, pinfo, tree, offset); + offset = func(tvb, pinfo, tree, offset, api_version); } return offset; @@ -272,7 +299,7 @@ dissect_kafka_string(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info * static int dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset, - int *p_string_offset, int *p_string_len) + int *p_bytes_offset, int *p_bytes_len) { gint32 len; proto_item *pi; @@ -282,7 +309,7 @@ dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *p pi = proto_tree_add_item(tree, hf_kafka_bytes_len, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; - if (p_string_offset != NULL) *p_string_offset = offset; + if (p_bytes_offset != NULL) *p_bytes_offset = offset; if (len < -1) { expert_add_info(pinfo, pi, &ei_kafka_bad_bytes_length); @@ -295,7 +322,7 @@ dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *p offset += len; } - if (p_string_len != NULL) *p_string_len = len; + if (p_bytes_len != NULL) *p_bytes_len = len; return offset; } @@ -323,12 +350,29 @@ kafka_get_bytes(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int offset) } static int +dissect_kafka_timestamp(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset) +{ + nstime_t nstime; + guint64 milliseconds; + + milliseconds = tvb_get_ntoh64(tvb, offset); + nstime.secs = milliseconds / 1000; + nstime.nsecs = (milliseconds % 1000) * 1000000; + + proto_tree_add_time(tree, hf_item, tvb, offset, 8, &nstime); + offset += 8; + + return offset; +} + +static int dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) { proto_item *ti, *decrypt_item; proto_tree *subtree; tvbuff_t *raw, *payload; int offset = start_offset; + gint8 magic_byte; guint8 codec; guint bytes_length = 0; @@ -340,6 +384,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s offset += 4; /* Magic */ + magic_byte = tvb_get_guint8(tvb, offset); proto_tree_add_item(subtree, hf_kafka_message_magic, tvb, offset, 1, ENC_BIG_ENDIAN); offset += 1; @@ -348,6 +393,11 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s codec = tvb_get_guint8(tvb, offset) & 0x07; offset += 1; + if (magic_byte >= 1) { + /* Timestamp */ + offset = dissect_kafka_timestamp(tvb, pinfo, subtree, hf_kafka_message_timestamp, offset); + } + offset = dissect_kafka_bytes(subtree, hf_kafka_message_key, tvb, pinfo, offset, NULL, &bytes_length); switch (codec) { @@ -436,6 +486,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s } break; #endif + case KAFKA_COMPRESSION_LZ4: case KAFKA_COMPRESSION_NONE: default: offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length); @@ -495,7 +546,8 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i /* OFFSET FETCH REQUEST/RESPONSE */ static int -dissect_kafka_partition_id(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +dissect_kafka_partition_id(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) { proto_tree_add_item(tree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; @@ -528,7 +580,8 @@ dissect_kafka_offset_get_value(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree } static int -dissect_kafka_offset(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +dissect_kafka_offset(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) { proto_tree_add_item(tree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN); offset += 8; @@ -536,9 +589,19 @@ dissect_kafka_offset(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, in return offset; } +static int +dissect_kafka_offset_time(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) +{ + proto_tree_add_item(tree, hf_kafka_offset_time, tvb, offset, 8, ENC_BIG_ENDIAN); + offset += 8; + + return offset; +} static int -dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -549,7 +612,7 @@ dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, prot offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); count = (gint32)tvb_get_ntohl(tvb, offset); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_partition_id); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_partition_id); proto_item_set_len(ti, offset - start_offset); proto_item_append_text(ti, " (%u partitions)", count); @@ -558,15 +621,17 @@ dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, prot } static int -dissect_kafka_offset_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_offset_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) { offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, NULL, NULL); - offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_request_topic); + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_offset_fetch_request_topic); return offset; } -static int dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +static int dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) { guint16 error = tvb_get_ntohs(tvb, offset); proto_tree_add_item(tree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN); @@ -580,7 +645,8 @@ static int dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree } static int -dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset) +dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int start_offset, kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -591,7 +657,7 @@ dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_partition, &ti, "Offset Fetch Response Partition"); offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values); - offset = dissect_kafka_offset(tvb, pinfo, subtree, offset); + offset = dissect_kafka_offset(tvb, pinfo, subtree, offset, api_version); offset = dissect_kafka_string(subtree, hf_kafka_metadata, tvb, pinfo, offset, NULL, NULL); @@ -606,7 +672,8 @@ dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo } static int -dissect_kafka_offset_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +dissect_kafka_offset_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -615,7 +682,8 @@ dissect_kafka_offset_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, pro subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "offset fetch response topic"); offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_response_partition); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_offset_fetch_response_partition); proto_item_set_len(ti, offset - start_offset); @@ -623,27 +691,33 @@ dissect_kafka_offset_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, pro } static int -dissect_kafka_offset_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_offset_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) { - return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_response_topic); + return dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_offset_fetch_response_topic); } /* METADATA REQUEST/RESPONSE */ static int -dissect_kafka_metadata_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_metadata_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) { return dissect_kafka_string(tree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); } static int -dissect_kafka_metadata_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_metadata_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) { - return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_metadata_request_topic); + return dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_metadata_request_topic); } static int -dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -664,6 +738,10 @@ dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; + if (api_version >= 1) { + offset = dissect_kafka_string(subtree, hf_kafka_broker_rack, tvb, pinfo, offset, NULL, NULL); + } + proto_item_append_text(ti, " (node %u: %s:%u)", nodeid, tvb_get_string_enc(wmem_packet_scope(), tvb, @@ -676,21 +754,24 @@ dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre } static int -dissect_kafka_metadata_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +dissect_kafka_metadata_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) { proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN); return offset + 4; } static int -dissect_kafka_metadata_isr(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +dissect_kafka_metadata_isr(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) { proto_tree_add_item(tree, hf_kafka_isr, tvb, offset, 4, ENC_BIG_ENDIAN); return offset + 4; } static int -dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset) +dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti, *subti; proto_tree *subtree, *subsubtree; @@ -701,19 +782,19 @@ dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tr offset = dissect_kafka_error(tvb, pinfo, subtree, offset); - offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset); + offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset, api_version); proto_tree_add_item(subtree, hf_kafka_partition_leader, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; sub_start_offset = offset; subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_metadata_replicas, &subti, "Replicas"); - offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, &dissect_kafka_metadata_replica); + offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_replica); proto_item_set_len(subti, offset - sub_start_offset); sub_start_offset = offset; subsubtree = proto_tree_add_subtree(subtree, tvb, offset, -1, ett_kafka_metadata_isr, &subti, "Caught-Up Replicas"); - offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, &dissect_kafka_metadata_isr); + offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_isr); proto_item_set_len(subti, offset - sub_start_offset); proto_item_set_len(ti, offset - start_offset); @@ -722,7 +803,8 @@ dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tr } static int -dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -738,7 +820,12 @@ dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree tvb_get_string_enc(wmem_packet_scope(), tvb, name_start, name_length, ENC_UTF_8|ENC_NA)); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_partition); + if (api_version >= 1) { + proto_tree_add_item(subtree, hf_kafka_is_internal, tvb, offset, 1, ENC_NA); + offset += 1; + } + + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_partition); proto_item_set_len(ti, offset - start_offset); @@ -746,19 +833,29 @@ dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree } static int -dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; int offset = start_offset; subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_metadata_brokers, &ti, "Broker Metadata"); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_broker); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_broker); proto_item_set_len(ti, offset - start_offset); + if (api_version >= 2) { + offset = dissect_kafka_string(tree, hf_kafka_cluster_id, tvb, pinfo, offset, NULL, NULL); + } + + if (api_version >= 1) { + proto_tree_add_item(tree, hf_kafka_controller_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + } + start_offset = offset; subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_metadata_topics, &ti, "Topic Metadata"); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_topic); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_metadata_topic); proto_item_set_len(ti, offset - start_offset); return offset; @@ -767,7 +864,8 @@ dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *t /* FETCH REQUEST/RESPONSE */ static int -dissect_kafka_fetch_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +dissect_kafka_fetch_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) { proto_item *ti; proto_tree *subtree; @@ -790,7 +888,8 @@ dissect_kafka_fetch_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, pro } static int -dissect_kafka_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +dissect_kafka_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -802,7 +901,8 @@ dissect_kafka_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &name_start, &name_length); count = tvb_get_ntohl(tvb, offset); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_fetch_request_partition); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_fetch_request_partition); proto_item_set_len(ti, offset - start_offset); proto_item_append_text(ti, " (%u partitions)", count); @@ -811,7 +911,8 @@ dissect_kafka_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree } static int -dissect_kafka_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) { proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; @@ -822,13 +923,19 @@ dissect_kafka_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, proto_tree_add_item(tree, hf_kafka_min_bytes, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; - offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_fetch_request_topic); + if (api_version >= 3) { + proto_tree_add_item(tree, hf_kafka_max_bytes, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + } + + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_fetch_request_topic); return offset; } static int -dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset) +dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version _U_) { proto_item *ti; proto_tree *subtree; @@ -855,7 +962,8 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, pr } static int -dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -866,7 +974,8 @@ dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); count = tvb_get_ntohl(tvb, offset); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_fetch_response_partition); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_fetch_response_partition); proto_item_set_len(ti, offset - start_offset); proto_item_append_text(ti, " (%u partitions)", count); @@ -875,21 +984,23 @@ dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree } static int -dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, guint16 api_version) +dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) { - if (api_version > 0) { + if (api_version >= 1) { /* Throttle time */ proto_tree_add_item(tree, hf_kafka_throttle_time, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; } - return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_fetch_response_topic); + return dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_fetch_response_topic); } /* PRODUCE REQUEST/RESPONSE */ static int -dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) { proto_item *ti; proto_tree *subtree; @@ -908,7 +1019,8 @@ dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, p } static int -dissect_kafka_produce_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +dissect_kafka_produce_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -917,7 +1029,8 @@ dissect_kafka_produce_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Produce Request Topic"); offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_produce_request_partition); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_produce_request_partition); proto_item_set_len(ti, offset - start_offset); @@ -925,7 +1038,8 @@ dissect_kafka_produce_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre } static int -dissect_kafka_produce_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_produce_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) { proto_tree_add_item(tree, hf_kafka_required_acks, tvb, offset, 2, ENC_BIG_ENDIAN); offset += 2; @@ -933,13 +1047,15 @@ dissect_kafka_produce_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre proto_tree_add_item(tree, hf_kafka_timeout, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; - offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_produce_request_topic); + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_produce_request_topic); return offset; } static int -dissect_kafka_produce_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +dissect_kafka_produce_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version _U_) { proto_item *ti; proto_tree *subtree; @@ -954,6 +1070,10 @@ dissect_kafka_produce_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, offset = dissect_kafka_offset_get_value(tvb, pinfo, subtree, offset, &packet_values); + if (api_version >= 2) { + offset = dissect_kafka_offset_time(tvb, pinfo, subtree, offset, api_version); + } + proto_item_append_text(ti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)", packet_values.partition_id, packet_values.offset); @@ -961,7 +1081,8 @@ dissect_kafka_produce_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, } static int -dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -970,7 +1091,8 @@ dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tr subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Produce Response Topic"); offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_produce_response_partition); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_produce_response_partition); proto_item_set_len(ti, offset - start_offset); @@ -978,11 +1100,12 @@ dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tr } static int -dissect_kafka_produce_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, guint16 api_version) +dissect_kafka_produce_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) { - offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_produce_response_topic); + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_produce_response_topic); - if (api_version > 0) { + if (api_version >= 1) { /* Throttle time */ proto_tree_add_item(tree, hf_kafka_throttle_time, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; @@ -991,29 +1114,35 @@ dissect_kafka_produce_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tr return offset; } -/* OFFSET REQUEST/RESPONSE */ +/* OFFSETS REQUEST/RESPONSE */ static int -dissect_kafka_offset_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +dissect_kafka_offsets_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int start_offset, kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; + int offset = start_offset; - subtree = proto_tree_add_subtree(tree, tvb, offset, 16, ett_kafka_request_partition, &ti, "Offset Request Partition"); + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_partition, &ti, "Offset Request Partition"); - offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset); + offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset, api_version); - proto_tree_add_item(subtree, hf_kafka_offset_time, tvb, offset, 8, ENC_BIG_ENDIAN); - offset += 8; + offset = dissect_kafka_offset_time(tvb, pinfo, subtree, offset, api_version); - proto_tree_add_item(subtree, hf_kafka_max_offsets, tvb, offset, 4, ENC_BIG_ENDIAN); - offset += 4; + if (api_version == 0) { + proto_tree_add_item(subtree, hf_kafka_max_offsets, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + } + + proto_item_set_len(ti, offset - start_offset); return offset; } static int -dissect_kafka_offset_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +dissect_kafka_offsets_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -1022,7 +1151,8 @@ dissect_kafka_offset_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Offset Request Topic"); offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_request_partition); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_offsets_request_partition); proto_item_set_len(ti, offset - start_offset); @@ -1030,18 +1160,20 @@ dissect_kafka_offset_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree } static int -dissect_kafka_offset_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_offsets_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) { proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; - offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_request_topic); + offset = dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_offsets_request_topic); return offset; } static int -dissect_kafka_offset_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset) +dissect_kafka_offsets_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, + int start_offset, kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -1049,11 +1181,18 @@ dissect_kafka_offset_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, p subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_partition, &ti, "Offset Response Partition"); - offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset); + offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset, api_version); offset = dissect_kafka_error(tvb, pinfo, subtree, offset); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset); + if (api_version == 0) { + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, &dissect_kafka_offset); + } + else if (api_version >= 1) { + offset = dissect_kafka_offset_time(tvb, pinfo, subtree, offset, api_version); + + offset = dissect_kafka_offset(tvb, pinfo, subtree, offset, api_version); + } proto_item_set_len(ti, offset - start_offset); @@ -1061,7 +1200,8 @@ dissect_kafka_offset_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, p } static int -dissect_kafka_offset_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +dissect_kafka_offsets_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, + kafka_api_version_t api_version) { proto_item *ti; proto_tree *subtree; @@ -1070,7 +1210,42 @@ dissect_kafka_offset_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Offset Response Topic"); offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_response_partition); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, api_version, + &dissect_kafka_offsets_response_partition); + + proto_item_set_len(ti, offset - start_offset); + + return offset; +} + +static int +dissect_kafka_offsets_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) +{ + return dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_offsets_response_topic); +} + +/* API_VERSIONS REQUEST/RESPONSE */ + +static int +dissect_kafka_api_versions_response_api_versions(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, + int start_offset, kafka_api_version_t api_version _U_) +{ + proto_item *ti; + proto_tree *subtree; + int offset = start_offset; + + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_api_versions, &ti, + "API Versions"); + + proto_tree_add_item(subtree, hf_kafka_api_versions_api_key, tvb, offset, 2, ENC_BIG_ENDIAN); + offset += 2; + + proto_tree_add_item(subtree, hf_kafka_api_versions_min_version, tvb, offset, 2, ENC_BIG_ENDIAN); + offset += 2; + + proto_tree_add_item(subtree, hf_kafka_api_versions_max_version, tvb, offset, 2, ENC_BIG_ENDIAN); + offset += 2; proto_item_set_len(ti, offset - start_offset); @@ -1078,9 +1253,12 @@ dissect_kafka_offset_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre } static int -dissect_kafka_offset_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_api_versions_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, + kafka_api_version_t api_version) { - return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_response_topic); + offset = dissect_kafka_error(tvb, pinfo, tree, offset); + return dissect_kafka_array(tree, tvb, pinfo, offset, api_version, + &dissect_kafka_api_versions_response_api_versions); } /* MAIN */ @@ -1140,11 +1318,13 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U } } - col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s Request", - val_to_str_const(matcher->api_key, kafka_apis, "Unknown")); + col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s v%d Request", + val_to_str_const(matcher->api_key, kafka_apis, "Unknown"), + matcher->api_version); /* Also add to protocol root */ - proto_item_append_text(root_ti, " (%s Request)", - val_to_str_const(matcher->api_key, kafka_apis, "Unknown")); + proto_item_append_text(root_ti, " (%s v%d Request)", + val_to_str_const(matcher->api_key, kafka_apis, "Unknown"), + matcher->api_version); if (matcher->response_found) { ti = proto_tree_add_uint(kafka_tree, hf_kafka_response_frame, tvb, @@ -1171,19 +1351,21 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U if (tvb_get_ntohs(tvb, offset) != 0 && !PINFO_FD_VISITED(pinfo)) { wmem_queue_push(match_queue, matcher); } - /*offset =*/ dissect_kafka_produce_request(tvb, pinfo, kafka_tree, offset); + /*offset =*/ dissect_kafka_produce_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; case KAFKA_OFFSET_FETCH: - /*offset =*/ dissect_kafka_offset_fetch_request(tvb, pinfo, kafka_tree, offset); + /*offset =*/ dissect_kafka_offset_fetch_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; case KAFKA_METADATA: - /*offset =*/ dissect_kafka_metadata_request(tvb, pinfo, kafka_tree, offset); + /*offset =*/ dissect_kafka_metadata_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; case KAFKA_FETCH: - /*offset =*/ dissect_kafka_fetch_request(tvb, pinfo, kafka_tree, offset); + /*offset =*/ dissect_kafka_fetch_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_OFFSETS: + /*offset =*/ dissect_kafka_offsets_request(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; - case KAFKA_OFFSET: - /*offset =*/ dissect_kafka_offset_request(tvb, pinfo, kafka_tree, offset); + case KAFKA_API_VERSIONS: break; } } @@ -1211,11 +1393,13 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U p_add_proto_data(wmem_file_scope(), pinfo, proto_kafka, 0, matcher); } - col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s Response", - val_to_str_const(matcher->api_key, kafka_apis, "Unknown")); + col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s v%d Response", + val_to_str_const(matcher->api_key, kafka_apis, "Unknown"), + matcher->api_version); /* Also add to protocol root */ - proto_item_append_text(root_ti, " (%s Response)", - val_to_str_const(matcher->api_key, kafka_apis, "Unknown")); + proto_item_append_text(root_ti, " (%s v%d Response)", + val_to_str_const(matcher->api_key, kafka_apis, "Unknown"), + matcher->api_version); /* Show request frame */ @@ -1240,16 +1424,19 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U /*offset =*/ dissect_kafka_produce_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; case KAFKA_OFFSET_FETCH: - /*offset =*/ dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset); + /*offset =*/ dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; case KAFKA_METADATA: - /*offset =*/ dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset); + /*offset =*/ dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; case KAFKA_FETCH: /*offset =*/ dissect_kafka_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; - case KAFKA_OFFSET: - /*offset =*/ dissect_kafka_offset_response(tvb, pinfo, kafka_tree, offset); + case KAFKA_OFFSETS: + /*offset =*/ dissect_kafka_offsets_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); + break; + case KAFKA_API_VERSIONS: + /*offset =*/ dissect_kafka_api_versions_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; } @@ -1415,6 +1602,11 @@ proto_register_kafka(void) FT_UINT8, BASE_DEC, VALS(kafka_codecs), 0x03, NULL, HFILL } }, + { &hf_kafka_message_timestamp, + { "Timestamp", "kafka.message_timestamp", + FT_ABSOLUTE_TIME, ABSOLUTE_TIME_UTC, NULL, 0, + NULL, HFILL } + }, { &hf_kafka_message_key, { "Key", "kafka.message_key", FT_BYTES, BASE_NONE, 0, 0, @@ -1450,6 +1642,26 @@ proto_register_kafka(void) FT_INT32, BASE_DEC, 0, 0, NULL, HFILL } }, + { &hf_kafka_broker_rack, + { "Rack", "kafka.rack", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_cluster_id, + { "Cluster ID", "kafka.cluster_id", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_controller_id, + { "Controller ID", "kafka.node_id", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_is_internal, + { "Is Internal", "kafka.is_internal", + FT_BOOLEAN, BASE_NONE, 0, 0, + NULL, HFILL } + }, { &hf_kafka_min_bytes, { "Min Bytes", "kafka.min_bytes", FT_INT32, BASE_DEC, 0, 0, @@ -1484,6 +1696,21 @@ proto_register_kafka(void) { "Response Frame", "kafka.reponse_frame", FT_FRAMENUM, BASE_NONE, FRAMENUM_TYPE(FT_FRAMENUM_RESPONSE), 0, NULL, HFILL } + }, + { &hf_kafka_api_versions_api_key, + { "API Key", "kafka.api_versions.api_key", + FT_INT16, BASE_DEC, VALS(kafka_apis), 0, + "API Key.", HFILL } + }, + { &hf_kafka_api_versions_min_version, + { "Min Version", "kafka.api_versions.min_version", + FT_INT16, BASE_DEC, 0, 0, + "Minimal version which supports api key.", HFILL } + }, + { &hf_kafka_api_versions_max_version, + { "Max Version", "kafka.api_versions.max_version", + FT_INT16, BASE_DEC, 0, 0, + "Maximal version which supports api key.", HFILL } } }; @@ -1499,7 +1726,8 @@ proto_register_kafka(void) &ett_kafka_request_topic, &ett_kafka_request_partition, &ett_kafka_response_topic, - &ett_kafka_response_partition + &ett_kafka_response_partition, + &ett_kafka_api_versions }; static ei_register_info ei[] = { |