diff options
author | Martin Mathieson <martin.r.mathieson@googlemail.com> | 2016-04-19 03:49:04 -0700 |
---|---|---|
committer | Martin Mathieson <martin.r.mathieson@googlemail.com> | 2016-04-20 20:16:22 +0000 |
commit | d37734256c77ae99cb1cc93a4d2d1959584109a6 (patch) | |
tree | 2579938c79c43bec94b71946856d3eb31e40b2be /epan/dissectors/packet-kafka.c | |
parent | 9921308df216640421fcaddeb309685a9f80c200 (diff) |
Kafka: several minor improvements
Use a range preference for TCP ports rather than single value.
Show interesting values in tree roots and/or the Info column.
Use common functions to dissect some protocol fields.
Change-Id: I9f5ca2565f47fc84d9c82a31511fae813542482e
Reviewed-on: https://code.wireshark.org/review/14949
Petri-Dish: Martin Mathieson <martin.r.mathieson@googlemail.com>
Reviewed-by: Martin Mathieson <martin.r.mathieson@googlemail.com>
Diffstat (limited to 'epan/dissectors/packet-kafka.c')
-rw-r--r-- | epan/dissectors/packet-kafka.c | 320 |
1 files changed, 233 insertions, 87 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c index 91caa6b731..3cbb422c8a 100644 --- a/epan/dissectors/packet-kafka.c +++ b/epan/dissectors/packet-kafka.c @@ -90,24 +90,44 @@ static expert_field ei_kafka_message_decompress = EI_INIT; static expert_field ei_kafka_bad_string_length = EI_INIT; static expert_field ei_kafka_bad_bytes_length = EI_INIT; -static guint kafka_port = 0; #define KAFKA_PRODUCE 0 #define KAFKA_FETCH 1 #define KAFKA_OFFSET 2 #define KAFKA_METADATA 3 /* 4-7 are "non-user facing control APIs" and are not documented */ +#define KAFKA_CONTROL_API_4 4 +#define KAFKA_CONTROL_API_5 5 +#define KAFKA_CONTROL_API_6 6 +#define KAFKA_CONTROL_API_7 7 + #define KAFKA_OFFSET_COMMIT 8 #define KAFKA_OFFSET_FETCH 9 #define KAFKA_CONSUMER_METADATA 10 +#define KAFKA_GROUP_JOIN 11 +#define KAFKA_HEARTBEAT 12 +#define KAFKA_GROUP_LEAVE 13 +#define KAFKA_GROUP_SYNC 14 +#define KAFKA_GROUPS_DESCRIBE 15 +#define KAFKA_GROUPS_LIST 16 static const value_string kafka_apis[] = { { KAFKA_PRODUCE, "Produce" }, { KAFKA_FETCH, "Fetch" }, { KAFKA_OFFSET, "Offset" }, { KAFKA_METADATA, "Metadata" }, + { KAFKA_CONTROL_API_4, "Unknown Control API (4)" }, + { KAFKA_CONTROL_API_5, "Unknown Control API (5)" }, + { KAFKA_CONTROL_API_6, "Unknown Control API (6)" }, + { KAFKA_CONTROL_API_7, "Unknown Control API (7)" }, { KAFKA_OFFSET_COMMIT, "Offset Commit" }, { KAFKA_OFFSET_FETCH, "Offset Fetch" }, { KAFKA_CONSUMER_METADATA, "Consumer Metadata" }, + { KAFKA_GROUP_JOIN, "Group Join" }, + { KAFKA_HEARTBEAT, "Heatbeat" }, + { KAFKA_GROUP_LEAVE, "Group Leave" }, + { KAFKA_GROUP_SYNC, "Group Sync" }, + { KAFKA_GROUPS_DESCRIBE, "Groups Describe" }, + { KAFKA_GROUPS_LIST, "Groups List" }, { 0, NULL } }; @@ -141,6 +161,15 @@ static const value_string kafka_codecs[] = { { 0, NULL } }; +/* List/range of TCP ports to register */ +static range_t *new_kafka_tcp_range = NULL; +static range_t *current_kafka_tcp_range = NULL; + +/* Defaulting to empty list of ports */ +#define TCP_DEFAULT_RANGE "" + + + typedef struct _kafka_query_response_t { gint16 api_key; guint32 request_frame; @@ -148,11 +177,18 @@ typedef struct _kafka_query_response_t { gboolean response_found; } kafka_query_response_t; -static int -dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset); + +/* Some values to temporarily remember during dissection */ +typedef struct kafka_packet_values_t { + guint32 partition_id; + gint64 offset; +} kafka_packet_values_t; + +/* Forward declaration (dissect_kafka_message_set() and dissect_kafka_message() call each other...) */ static int dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field); + /* HELPERS */ static guint @@ -162,7 +198,8 @@ get_kafka_pdu_len(packet_info *pinfo _U_, tvbuff_t *tvb, int offset, void *data } static int -dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int offset, int(*func)(tvbuff_t*, packet_info*, proto_tree*, int)) +dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int offset, + int(*func)(tvbuff_t*, packet_info*, proto_tree*, int)) { gint32 count, i; @@ -178,39 +215,52 @@ dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int off } static int -dissect_kafka_string(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset) +dissect_kafka_string(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset, + int *p_string_offset, int *p_string_len) { gint16 len; proto_item *pi; + /* String length */ len = (gint16) tvb_get_ntohs(tvb, offset); pi = proto_tree_add_item(tree, hf_kafka_string_len, tvb, offset, 2, ENC_BIG_ENDIAN); offset += 2; + if (p_string_offset != NULL) *p_string_offset = offset; + if (len < -1) { expert_add_info(pinfo, pi, &ei_kafka_bad_string_length); } else if (len == -1) { + /* -1 indicates a NULL string */ proto_tree_add_string(tree, hf_item, tvb, offset, 0, NULL); + } else { + /* Add the string itself. */ proto_tree_add_item(tree, hf_item, tvb, offset, len, ENC_NA|ENC_ASCII); offset += len; } + if (p_string_len != NULL) *p_string_len = len; + return offset; } static int -dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset) +dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset, + int *p_string_offset, int *p_string_len) { gint32 len; proto_item *pi; + /* Length */ len = (gint32) tvb_get_ntohl(tvb, offset); pi = proto_tree_add_item(tree, hf_kafka_bytes_len, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; + if (p_string_offset != NULL) *p_string_offset = offset; + if (len < -1) { expert_add_info(pinfo, pi, &ei_kafka_bad_bytes_length); } @@ -222,6 +272,8 @@ dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *p offset += len; } + if (p_string_len != NULL) *p_string_len = len; + return offset; } @@ -255,21 +307,25 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s tvbuff_t *raw, *payload; int offset = start_offset; guint8 codec; + guint bytes_length = 0; subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_message, &ti, "Message"); + /* CRC */ proto_tree_add_item(subtree, hf_kafka_message_crc, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; + /* Magic */ proto_tree_add_item(subtree, hf_kafka_message_magic, tvb, offset, 1, ENC_BIG_ENDIAN); offset += 1; + /* Codec */ proto_tree_add_item(subtree, hf_kafka_message_codec, tvb, offset, 1, ENC_BIG_ENDIAN); codec = tvb_get_guint8(tvb, offset) & 0x07; offset += 1; - offset = dissect_kafka_bytes(subtree, hf_kafka_message_key, tvb, pinfo, offset); + offset = dissect_kafka_bytes(subtree, hf_kafka_message_key, tvb, pinfo, offset, NULL, &bytes_length); switch (codec) { case KAFKA_COMPRESSION_GZIP: @@ -290,12 +346,21 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s else { proto_tree_add_bytes(subtree, hf_kafka_message_value, tvb, offset, 0, NULL); } + + /* Add to summary */ + col_append_fstr(pinfo->cinfo, COL_INFO, " [%u bytes GZIPd]", bytes_length); + proto_item_append_text(ti, " (%u bytes GZIPd)", bytes_length); + break; case KAFKA_COMPRESSION_SNAPPY: /* We can't uncompress snappy yet... */ case KAFKA_COMPRESSION_NONE: default: - offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset); + offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length); + + /* Add to summary */ + col_append_fstr(pinfo->cinfo, COL_INFO, " [%u bytes]", bytes_length); + proto_item_append_text(ti, " (%u bytes)", bytes_length); } proto_item_set_len(ti, offset - start_offset); @@ -348,27 +413,64 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i /* OFFSET FETCH REQUEST/RESPONSE */ static int -dissect_kafka_offset_fetch_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +dissect_kafka_partition_id(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +{ + proto_tree_add_item(tree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + return offset; +} + +static int +dissect_kafka_partition_id_get_value(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset, kafka_packet_values_t* packet_values) { proto_tree_add_item(tree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + if (packet_values != NULL) { + packet_values->partition_id = tvb_get_ntohl(tvb, offset); + } offset += 4; return offset; } static int +dissect_kafka_offset_get_value(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset, kafka_packet_values_t* packet_values) +{ + proto_tree_add_item(tree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN); + if (packet_values != NULL) { + packet_values->offset = tvb_get_ntoh64(tvb, offset); + } + offset += 8; + + return offset; +} + +static int +dissect_kafka_offset(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +{ + proto_tree_add_item(tree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN); + offset += 8; + + return offset; +} + + +static int dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) { proto_item *ti; proto_tree *subtree; int offset = start_offset; + guint32 count; subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Offset Fetch Request Topic"); - offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_request_partition); + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); + count = (gint32)tvb_get_ntohl(tvb, offset); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_partition_id); proto_item_set_len(ti, offset - start_offset); + proto_item_append_text(ti, " (%u partitions)", count); return offset; } @@ -376,32 +478,45 @@ dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, prot static int dissect_kafka_offset_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) { - offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset); - + offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, NULL, NULL); offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_request_topic); return offset; } +static int dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +{ + guint16 error = tvb_get_ntohs(tvb, offset); + proto_tree_add_item(tree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN); + /* Show error in Info column */ + if (error != 0) { + col_append_fstr(pinfo->cinfo, COL_INFO, + " [%s] ", val_to_str_const(error, kafka_errors, "Unknown")); + } + offset += 2; + return offset; +} + static int dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset) { proto_item *ti; proto_tree *subtree; int offset = start_offset; + kafka_packet_values_t packet_values; + memset(&packet_values, 0, sizeof(packet_values)); subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_partition, &ti, "Offset Fetch Response Partition"); - proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); - offset += 4; + offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values); + offset = dissect_kafka_offset(tvb, pinfo, subtree, offset); - proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN); - offset += 8; + offset = dissect_kafka_string(subtree, hf_kafka_metadata, tvb, pinfo, offset, NULL, NULL); - offset = dissect_kafka_string(tree, hf_kafka_metadata, tvb, pinfo, offset); + offset = dissect_kafka_error(tvb, pinfo, subtree, offset); - proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN); - offset += 2; + proto_item_append_text(ti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)", + packet_values.partition_id, packet_values.offset); proto_item_set_len(ti, offset - start_offset); @@ -417,7 +532,7 @@ dissect_kafka_offset_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, pro subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "offset fetch response topic"); - offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset); + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_response_partition); proto_item_set_len(ti, offset - start_offset); @@ -436,7 +551,7 @@ dissect_kafka_offset_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tre static int dissect_kafka_metadata_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) { - return dissect_kafka_string(tree, hf_kafka_topic_name, tvb, pinfo, offset); + return dissect_kafka_string(tree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); } static int @@ -451,17 +566,28 @@ dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre proto_item *ti; proto_tree *subtree; int offset = start_offset; + guint32 nodeid; + int host_start, host_len; + guint32 broker_port; subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_metadata_broker, &ti, "Broker"); + nodeid = tvb_get_ntohl(tvb, offset); proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; - offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset); + offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset, &host_start, &host_len); + broker_port = tvb_get_ntohl(tvb, offset); proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; + proto_item_append_text(ti, " (node %u: %s:%u)", + nodeid, + tvb_get_string_enc(wmem_packet_scope(), tvb, + host_start, host_len, ENC_UTF_8|ENC_NA), + broker_port); + proto_item_set_len(ti, offset - start_offset); return offset; @@ -491,11 +617,9 @@ dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tr subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_partition, &ti, "Partition"); - proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN); - offset += 2; + offset = dissect_kafka_error(tvb, pinfo, subtree, offset); - proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); - offset += 4; + offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset); proto_tree_add_item(subtree, hf_kafka_partition_leader, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; @@ -521,13 +645,16 @@ dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree proto_item *ti; proto_tree *subtree; int offset = start_offset; + int name_start, name_length; subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Topic"); - proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN); - offset += 2; + offset = dissect_kafka_error(tvb, pinfo, subtree, offset); - offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset); + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &name_start, &name_length); + proto_item_append_text(ti, " (%s)", + tvb_get_string_enc(wmem_packet_scope(), tvb, + name_start, name_length, ENC_UTF_8|ENC_NA)); offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_partition); @@ -562,18 +689,21 @@ dissect_kafka_fetch_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, pro { proto_item *ti; proto_tree *subtree; + kafka_packet_values_t packet_values; + memset(&packet_values, 0, sizeof(packet_values)); subtree = proto_tree_add_subtree(tree, tvb, offset, 16, ett_kafka_request_partition, &ti, "Fetch Request Partition"); - proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); - offset += 4; + offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values); - proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN); - offset += 8; + offset = dissect_kafka_offset_get_value(tvb, pinfo, subtree, offset, &packet_values); proto_tree_add_item(subtree, hf_kafka_max_bytes, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; + proto_item_append_text(ti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)", + packet_values.partition_id, packet_values.offset); + return offset; } @@ -583,13 +713,17 @@ dissect_kafka_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree proto_item *ti; proto_tree *subtree; int offset = start_offset; + guint32 count; + int name_start, name_length; subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Fetch Request Topic"); - offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset); + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &name_start, &name_length); + count = tvb_get_ntohl(tvb, offset); offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_fetch_request_partition); proto_item_set_len(ti, offset - start_offset); + proto_item_append_text(ti, " (%u partitions)", count); return offset; } @@ -617,22 +751,24 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, pr proto_item *ti; proto_tree *subtree; int offset = start_offset; + kafka_packet_values_t packet_values; + memset(&packet_values, 0, sizeof(packet_values)); subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_partition, &ti, "Fetch Response Partition"); - proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); - offset += 4; + offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values); - proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN); - offset += 2; + offset = dissect_kafka_error(tvb, pinfo, subtree, offset); - proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN); - offset += 8; + offset = dissect_kafka_offset_get_value(tvb, pinfo, subtree, offset, &packet_values); offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE); proto_item_set_len(ti, offset - start_offset); + proto_item_append_text(ti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)", + packet_values.partition_id, packet_values.offset); + return offset; } @@ -642,13 +778,16 @@ dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree proto_item *ti; proto_tree *subtree; int offset = start_offset; + guint32 count; subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Fetch Response Topic"); - offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset); + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); + count = tvb_get_ntohl(tvb, offset); offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_fetch_response_partition); proto_item_set_len(ti, offset - start_offset); + proto_item_append_text(ti, " (%u partitions)", count); return offset; } @@ -664,15 +803,19 @@ dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree static int dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) { + proto_item *ti; proto_tree *subtree; + kafka_packet_values_t packet_values; + memset(&packet_values, 0, sizeof(packet_values)); - subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_request_partition, NULL, "Produce Request Partition"); + subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_request_partition, &ti, "Produce Request Partition"); - proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); - offset += 4; + offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values); offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE); + proto_item_append_text(ti, " (Partition-ID=%u)", packet_values.partition_id); + return offset; } @@ -685,7 +828,7 @@ dissect_kafka_produce_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Produce Request Topic"); - offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset); + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_produce_request_partition); proto_item_set_len(ti, offset - start_offset); @@ -710,18 +853,21 @@ dissect_kafka_produce_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre static int dissect_kafka_produce_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) { + proto_item *ti; proto_tree *subtree; + kafka_packet_values_t packet_values; + memset(&packet_values, 0, sizeof(packet_values)); - subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_response_partition, NULL, "Produce Response Partition"); + subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_response_partition, &ti, "Produce Response Partition"); - proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); - offset += 4; + offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values); - proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN); - offset += 2; + offset = dissect_kafka_error(tvb, pinfo, subtree, offset); - proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN); - offset += 8; + offset = dissect_kafka_offset_get_value(tvb, pinfo, subtree, offset, &packet_values); + + proto_item_append_text(ti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)", + packet_values.partition_id, packet_values.offset); return offset; } @@ -735,7 +881,7 @@ dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tr subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Produce Response Topic"); - offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset); + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_produce_response_partition); proto_item_set_len(ti, offset - start_offset); @@ -759,8 +905,7 @@ dissect_kafka_offset_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, pr subtree = proto_tree_add_subtree(tree, tvb, offset, 16, ett_kafka_request_partition, &ti, "Offset Request Partition"); - proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); - offset += 4; + offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset); proto_tree_add_item(subtree, hf_kafka_offset_time, tvb, offset, 8, ENC_BIG_ENDIAN); offset += 8; @@ -780,7 +925,7 @@ dissect_kafka_offset_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Offset Request Topic"); - offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset); + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_request_partition); proto_item_set_len(ti, offset - start_offset); @@ -800,13 +945,6 @@ dissect_kafka_offset_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree } static int -dissect_kafka_offset_response_offset(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) -{ - proto_tree_add_item(tree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN); - return offset+8; -} - -static int dissect_kafka_offset_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset) { proto_item *ti; @@ -815,13 +953,11 @@ dissect_kafka_offset_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, p subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_partition, &ti, "Offset Response Partition"); - proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); - offset += 4; + offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset); - proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN); - offset += 2; + offset = dissect_kafka_error(tvb, pinfo, subtree, offset); - offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_response_offset); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset); proto_item_set_len(ti, offset - start_offset); @@ -837,7 +973,7 @@ dissect_kafka_offset_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Offset Response Topic"); - offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset); + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL); offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_response_partition); proto_item_set_len(ti, offset - start_offset); @@ -856,7 +992,7 @@ dissect_kafka_offset_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre static int dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U_) { - proto_item *ti; + proto_item *root_ti, *ti; proto_tree *kafka_tree; int offset = 0; kafka_query_response_t *matcher = NULL; @@ -866,9 +1002,9 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U col_set_str(pinfo->cinfo, COL_PROTOCOL, "Kafka"); col_clear(pinfo->cinfo, COL_INFO); - ti = proto_tree_add_item(tree, proto_kafka, tvb, 0, -1, ENC_NA); + root_ti = proto_tree_add_item(tree, proto_kafka, tvb, 0, -1, ENC_NA); - kafka_tree = proto_item_add_subtree(ti, ett_kafka); + kafka_tree = proto_item_add_subtree(root_ti, ett_kafka); proto_tree_add_item(kafka_tree, hf_kafka_len, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; @@ -884,9 +1020,8 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U matcher = (kafka_query_response_t *) p_get_proto_data(wmem_file_scope(), pinfo, proto_kafka, 0); } - if (pinfo->destport == kafka_port) { + if (value_is_in_range(current_kafka_tcp_range, pinfo->destport)) { /* Request */ - if (matcher == NULL) { matcher = wmem_new(wmem_file_scope(), kafka_query_response_t); @@ -909,6 +1044,9 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s Request", val_to_str_const(matcher->api_key, kafka_apis, "Unknown")); + /* Also add to protocol root */ + proto_item_append_text(root_ti, " (%s Request)", + val_to_str_const(matcher->api_key, kafka_apis, "Unknown")); if (matcher->response_found) { ti = proto_tree_add_uint(kafka_tree, hf_kafka_response_frame, tvb, @@ -925,7 +1063,7 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U proto_tree_add_item(kafka_tree, hf_kafka_correlation_id, tvb, offset, 4, ENC_BIG_ENDIAN); offset += 4; - offset = dissect_kafka_string(kafka_tree, hf_kafka_client_id, tvb, pinfo, offset); + offset = dissect_kafka_string(kafka_tree, hf_kafka_client_id, tvb, pinfo, offset, NULL, NULL); switch (matcher->api_key) { /* TODO: decode other request types */ @@ -977,6 +1115,11 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s Response", val_to_str_const(matcher->api_key, kafka_apis, "Unknown")); + /* Also add to protocol root */ + proto_item_append_text(root_ti, " (%s Response)", + val_to_str_const(matcher->api_key, kafka_apis, "Unknown")); + + ti = proto_tree_add_uint(kafka_tree, hf_kafka_request_frame, tvb, 0, 0, matcher->request_frame); @@ -1254,10 +1397,15 @@ proto_register_kafka(void) kafka_module = prefs_register_protocol(proto_kafka, proto_reg_handoff_kafka); - /* Register an example port preference */ - prefs_register_uint_preference(kafka_module, "tcp.port", "Broker TCP Port", - "Kafka broker's TCP port", - 10, &kafka_port); + /* Preference for list/range of TCP server ports */ + range_convert_str(&new_kafka_tcp_range, TCP_DEFAULT_RANGE, 65535); + new_kafka_tcp_range = range_empty(); + prefs_register_range_preference(kafka_module, "tcp.ports", "Broker TCP Ports", + "TCP Ports range", + &new_kafka_tcp_range, 65535); + + /* Single-port preference no longer in use */ + prefs_register_obsolete_preference(kafka_module, "tcp.port"); } void @@ -1265,20 +1413,18 @@ proto_reg_handoff_kafka(void) { static gboolean initialized = FALSE; static dissector_handle_t kafka_handle; - static int currentPort; if (!initialized) { kafka_handle = create_dissector_handle(dissect_kafka_tcp, proto_kafka); initialized = TRUE; - - } else { - dissector_delete_uint("tcp.port", currentPort, kafka_handle); } - currentPort = kafka_port; - - dissector_add_uint("tcp.port", currentPort, kafka_handle); + /* Replace range of ports with current */ + dissector_delete_uint_range("tcp.port", current_kafka_tcp_range, kafka_handle); + g_free(current_kafka_tcp_range); + current_kafka_tcp_range = range_copy(new_kafka_tcp_range); + dissector_add_uint_range("tcp.port", new_kafka_tcp_range, kafka_handle); } /* |