aboutsummaryrefslogtreecommitdiffstats
path: root/epan/dissectors/packet-kafka.c
diff options
context:
space:
mode:
authorMartin Mathieson <martin.r.mathieson@googlemail.com>2016-04-19 03:49:04 -0700
committerMartin Mathieson <martin.r.mathieson@googlemail.com>2016-04-20 20:16:22 +0000
commitd37734256c77ae99cb1cc93a4d2d1959584109a6 (patch)
tree2579938c79c43bec94b71946856d3eb31e40b2be /epan/dissectors/packet-kafka.c
parent9921308df216640421fcaddeb309685a9f80c200 (diff)
Kafka: several minor improvements
Use a range preference for TCP ports rather than single value. Show interesting values in tree roots and/or the Info column. Use common functions to dissect some protocol fields. Change-Id: I9f5ca2565f47fc84d9c82a31511fae813542482e Reviewed-on: https://code.wireshark.org/review/14949 Petri-Dish: Martin Mathieson <martin.r.mathieson@googlemail.com> Reviewed-by: Martin Mathieson <martin.r.mathieson@googlemail.com>
Diffstat (limited to 'epan/dissectors/packet-kafka.c')
-rw-r--r--epan/dissectors/packet-kafka.c320
1 files changed, 233 insertions, 87 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c
index 91caa6b731..3cbb422c8a 100644
--- a/epan/dissectors/packet-kafka.c
+++ b/epan/dissectors/packet-kafka.c
@@ -90,24 +90,44 @@ static expert_field ei_kafka_message_decompress = EI_INIT;
static expert_field ei_kafka_bad_string_length = EI_INIT;
static expert_field ei_kafka_bad_bytes_length = EI_INIT;
-static guint kafka_port = 0;
#define KAFKA_PRODUCE 0
#define KAFKA_FETCH 1
#define KAFKA_OFFSET 2
#define KAFKA_METADATA 3
/* 4-7 are "non-user facing control APIs" and are not documented */
+#define KAFKA_CONTROL_API_4 4
+#define KAFKA_CONTROL_API_5 5
+#define KAFKA_CONTROL_API_6 6
+#define KAFKA_CONTROL_API_7 7
+
#define KAFKA_OFFSET_COMMIT 8
#define KAFKA_OFFSET_FETCH 9
#define KAFKA_CONSUMER_METADATA 10
+#define KAFKA_GROUP_JOIN 11
+#define KAFKA_HEARTBEAT 12
+#define KAFKA_GROUP_LEAVE 13
+#define KAFKA_GROUP_SYNC 14
+#define KAFKA_GROUPS_DESCRIBE 15
+#define KAFKA_GROUPS_LIST 16
static const value_string kafka_apis[] = {
{ KAFKA_PRODUCE, "Produce" },
{ KAFKA_FETCH, "Fetch" },
{ KAFKA_OFFSET, "Offset" },
{ KAFKA_METADATA, "Metadata" },
+ { KAFKA_CONTROL_API_4, "Unknown Control API (4)" },
+ { KAFKA_CONTROL_API_5, "Unknown Control API (5)" },
+ { KAFKA_CONTROL_API_6, "Unknown Control API (6)" },
+ { KAFKA_CONTROL_API_7, "Unknown Control API (7)" },
{ KAFKA_OFFSET_COMMIT, "Offset Commit" },
{ KAFKA_OFFSET_FETCH, "Offset Fetch" },
{ KAFKA_CONSUMER_METADATA, "Consumer Metadata" },
+ { KAFKA_GROUP_JOIN, "Group Join" },
+ { KAFKA_HEARTBEAT, "Heatbeat" },
+ { KAFKA_GROUP_LEAVE, "Group Leave" },
+ { KAFKA_GROUP_SYNC, "Group Sync" },
+ { KAFKA_GROUPS_DESCRIBE, "Groups Describe" },
+ { KAFKA_GROUPS_LIST, "Groups List" },
{ 0, NULL }
};
@@ -141,6 +161,15 @@ static const value_string kafka_codecs[] = {
{ 0, NULL }
};
+/* List/range of TCP ports to register */
+static range_t *new_kafka_tcp_range = NULL;
+static range_t *current_kafka_tcp_range = NULL;
+
+/* Defaulting to empty list of ports */
+#define TCP_DEFAULT_RANGE ""
+
+
+
typedef struct _kafka_query_response_t {
gint16 api_key;
guint32 request_frame;
@@ -148,11 +177,18 @@ typedef struct _kafka_query_response_t {
gboolean response_found;
} kafka_query_response_t;
-static int
-dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset);
+
+/* Some values to temporarily remember during dissection */
+typedef struct kafka_packet_values_t {
+ guint32 partition_id;
+ gint64 offset;
+} kafka_packet_values_t;
+
+/* Forward declaration (dissect_kafka_message_set() and dissect_kafka_message() call each other...) */
static int
dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field);
+
/* HELPERS */
static guint
@@ -162,7 +198,8 @@ get_kafka_pdu_len(packet_info *pinfo _U_, tvbuff_t *tvb, int offset, void *data
}
static int
-dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int offset, int(*func)(tvbuff_t*, packet_info*, proto_tree*, int))
+dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int offset,
+ int(*func)(tvbuff_t*, packet_info*, proto_tree*, int))
{
gint32 count, i;
@@ -178,39 +215,52 @@ dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int off
}
static int
-dissect_kafka_string(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset)
+dissect_kafka_string(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset,
+ int *p_string_offset, int *p_string_len)
{
gint16 len;
proto_item *pi;
+ /* String length */
len = (gint16) tvb_get_ntohs(tvb, offset);
pi = proto_tree_add_item(tree, hf_kafka_string_len, tvb, offset, 2, ENC_BIG_ENDIAN);
offset += 2;
+ if (p_string_offset != NULL) *p_string_offset = offset;
+
if (len < -1) {
expert_add_info(pinfo, pi, &ei_kafka_bad_string_length);
}
else if (len == -1) {
+ /* -1 indicates a NULL string */
proto_tree_add_string(tree, hf_item, tvb, offset, 0, NULL);
+
}
else {
+ /* Add the string itself. */
proto_tree_add_item(tree, hf_item, tvb, offset, len, ENC_NA|ENC_ASCII);
offset += len;
}
+ if (p_string_len != NULL) *p_string_len = len;
+
return offset;
}
static int
-dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset)
+dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset,
+ int *p_string_offset, int *p_string_len)
{
gint32 len;
proto_item *pi;
+ /* Length */
len = (gint32) tvb_get_ntohl(tvb, offset);
pi = proto_tree_add_item(tree, hf_kafka_bytes_len, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
+ if (p_string_offset != NULL) *p_string_offset = offset;
+
if (len < -1) {
expert_add_info(pinfo, pi, &ei_kafka_bad_bytes_length);
}
@@ -222,6 +272,8 @@ dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *p
offset += len;
}
+ if (p_string_len != NULL) *p_string_len = len;
+
return offset;
}
@@ -255,21 +307,25 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
tvbuff_t *raw, *payload;
int offset = start_offset;
guint8 codec;
+ guint bytes_length = 0;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_message, &ti, "Message");
+ /* CRC */
proto_tree_add_item(subtree, hf_kafka_message_crc, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
+ /* Magic */
proto_tree_add_item(subtree, hf_kafka_message_magic, tvb, offset, 1, ENC_BIG_ENDIAN);
offset += 1;
+ /* Codec */
proto_tree_add_item(subtree, hf_kafka_message_codec, tvb, offset, 1, ENC_BIG_ENDIAN);
codec = tvb_get_guint8(tvb, offset) & 0x07;
offset += 1;
- offset = dissect_kafka_bytes(subtree, hf_kafka_message_key, tvb, pinfo, offset);
+ offset = dissect_kafka_bytes(subtree, hf_kafka_message_key, tvb, pinfo, offset, NULL, &bytes_length);
switch (codec) {
case KAFKA_COMPRESSION_GZIP:
@@ -290,12 +346,21 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
else {
proto_tree_add_bytes(subtree, hf_kafka_message_value, tvb, offset, 0, NULL);
}
+
+ /* Add to summary */
+ col_append_fstr(pinfo->cinfo, COL_INFO, " [%u bytes GZIPd]", bytes_length);
+ proto_item_append_text(ti, " (%u bytes GZIPd)", bytes_length);
+
break;
case KAFKA_COMPRESSION_SNAPPY:
/* We can't uncompress snappy yet... */
case KAFKA_COMPRESSION_NONE:
default:
- offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset);
+ offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length);
+
+ /* Add to summary */
+ col_append_fstr(pinfo->cinfo, COL_INFO, " [%u bytes]", bytes_length);
+ proto_item_append_text(ti, " (%u bytes)", bytes_length);
}
proto_item_set_len(ti, offset - start_offset);
@@ -348,27 +413,64 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
/* OFFSET FETCH REQUEST/RESPONSE */
static int
-dissect_kafka_offset_fetch_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+dissect_kafka_partition_id(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+{
+ proto_tree_add_item(tree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ return offset;
+}
+
+static int
+dissect_kafka_partition_id_get_value(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset, kafka_packet_values_t* packet_values)
{
proto_tree_add_item(tree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ if (packet_values != NULL) {
+ packet_values->partition_id = tvb_get_ntohl(tvb, offset);
+ }
offset += 4;
return offset;
}
static int
+dissect_kafka_offset_get_value(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset, kafka_packet_values_t* packet_values)
+{
+ proto_tree_add_item(tree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN);
+ if (packet_values != NULL) {
+ packet_values->offset = tvb_get_ntoh64(tvb, offset);
+ }
+ offset += 8;
+
+ return offset;
+}
+
+static int
+dissect_kafka_offset(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+{
+ proto_tree_add_item(tree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN);
+ offset += 8;
+
+ return offset;
+}
+
+
+static int
dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
{
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
+ guint32 count;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Offset Fetch Request Topic");
- offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_request_partition);
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
+ count = (gint32)tvb_get_ntohl(tvb, offset);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_partition_id);
proto_item_set_len(ti, offset - start_offset);
+ proto_item_append_text(ti, " (%u partitions)", count);
return offset;
}
@@ -376,32 +478,45 @@ dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, prot
static int
dissect_kafka_offset_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
{
- offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset);
-
+ offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset, NULL, NULL);
offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_request_topic);
return offset;
}
+static int dissect_kafka_error(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+{
+ guint16 error = tvb_get_ntohs(tvb, offset);
+ proto_tree_add_item(tree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
+ /* Show error in Info column */
+ if (error != 0) {
+ col_append_fstr(pinfo->cinfo, COL_INFO,
+ " [%s] ", val_to_str_const(error, kafka_errors, "Unknown"));
+ }
+ offset += 2;
+ return offset;
+}
+
static int
dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset)
{
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
+ kafka_packet_values_t packet_values;
+ memset(&packet_values, 0, sizeof(packet_values));
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_partition, &ti, "Offset Fetch Response Partition");
- proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
- offset += 4;
+ offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values);
+ offset = dissect_kafka_offset(tvb, pinfo, subtree, offset);
- proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN);
- offset += 8;
+ offset = dissect_kafka_string(subtree, hf_kafka_metadata, tvb, pinfo, offset, NULL, NULL);
- offset = dissect_kafka_string(tree, hf_kafka_metadata, tvb, pinfo, offset);
+ offset = dissect_kafka_error(tvb, pinfo, subtree, offset);
- proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
- offset += 2;
+ proto_item_append_text(ti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)",
+ packet_values.partition_id, packet_values.offset);
proto_item_set_len(ti, offset - start_offset);
@@ -417,7 +532,7 @@ dissect_kafka_offset_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, pro
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "offset fetch response topic");
- offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_response_partition);
proto_item_set_len(ti, offset - start_offset);
@@ -436,7 +551,7 @@ dissect_kafka_offset_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tre
static int
dissect_kafka_metadata_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
{
- return dissect_kafka_string(tree, hf_kafka_topic_name, tvb, pinfo, offset);
+ return dissect_kafka_string(tree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
}
static int
@@ -451,17 +566,28 @@ dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
+ guint32 nodeid;
+ int host_start, host_len;
+ guint32 broker_port;
subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_metadata_broker, &ti, "Broker");
+ nodeid = tvb_get_ntohl(tvb, offset);
proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
- offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset);
+ offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset, &host_start, &host_len);
+ broker_port = tvb_get_ntohl(tvb, offset);
proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
+ proto_item_append_text(ti, " (node %u: %s:%u)",
+ nodeid,
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ host_start, host_len, ENC_UTF_8|ENC_NA),
+ broker_port);
+
proto_item_set_len(ti, offset - start_offset);
return offset;
@@ -491,11 +617,9 @@ dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tr
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_partition, &ti, "Partition");
- proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
- offset += 2;
+ offset = dissect_kafka_error(tvb, pinfo, subtree, offset);
- proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
- offset += 4;
+ offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset);
proto_tree_add_item(subtree, hf_kafka_partition_leader, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
@@ -521,13 +645,16 @@ dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
+ int name_start, name_length;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Topic");
- proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
- offset += 2;
+ offset = dissect_kafka_error(tvb, pinfo, subtree, offset);
- offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &name_start, &name_length);
+ proto_item_append_text(ti, " (%s)",
+ tvb_get_string_enc(wmem_packet_scope(), tvb,
+ name_start, name_length, ENC_UTF_8|ENC_NA));
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_partition);
@@ -562,18 +689,21 @@ dissect_kafka_fetch_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, pro
{
proto_item *ti;
proto_tree *subtree;
+ kafka_packet_values_t packet_values;
+ memset(&packet_values, 0, sizeof(packet_values));
subtree = proto_tree_add_subtree(tree, tvb, offset, 16, ett_kafka_request_partition, &ti, "Fetch Request Partition");
- proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
- offset += 4;
+ offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values);
- proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN);
- offset += 8;
+ offset = dissect_kafka_offset_get_value(tvb, pinfo, subtree, offset, &packet_values);
proto_tree_add_item(subtree, hf_kafka_max_bytes, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
+ proto_item_append_text(ti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)",
+ packet_values.partition_id, packet_values.offset);
+
return offset;
}
@@ -583,13 +713,17 @@ dissect_kafka_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
+ guint32 count;
+ int name_start, name_length;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Fetch Request Topic");
- offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, &name_start, &name_length);
+ count = tvb_get_ntohl(tvb, offset);
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_fetch_request_partition);
proto_item_set_len(ti, offset - start_offset);
+ proto_item_append_text(ti, " (%u partitions)", count);
return offset;
}
@@ -617,22 +751,24 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, pr
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
+ kafka_packet_values_t packet_values;
+ memset(&packet_values, 0, sizeof(packet_values));
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_partition, &ti, "Fetch Response Partition");
- proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
- offset += 4;
+ offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values);
- proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
- offset += 2;
+ offset = dissect_kafka_error(tvb, pinfo, subtree, offset);
- proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN);
- offset += 8;
+ offset = dissect_kafka_offset_get_value(tvb, pinfo, subtree, offset, &packet_values);
offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE);
proto_item_set_len(ti, offset - start_offset);
+ proto_item_append_text(ti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)",
+ packet_values.partition_id, packet_values.offset);
+
return offset;
}
@@ -642,13 +778,16 @@ dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
+ guint32 count;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Fetch Response Topic");
- offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
+ count = tvb_get_ntohl(tvb, offset);
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_fetch_response_partition);
proto_item_set_len(ti, offset - start_offset);
+ proto_item_append_text(ti, " (%u partitions)", count);
return offset;
}
@@ -664,15 +803,19 @@ dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree
static int
dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
{
+ proto_item *ti;
proto_tree *subtree;
+ kafka_packet_values_t packet_values;
+ memset(&packet_values, 0, sizeof(packet_values));
- subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_request_partition, NULL, "Produce Request Partition");
+ subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_request_partition, &ti, "Produce Request Partition");
- proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
- offset += 4;
+ offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values);
offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE);
+ proto_item_append_text(ti, " (Partition-ID=%u)", packet_values.partition_id);
+
return offset;
}
@@ -685,7 +828,7 @@ dissect_kafka_produce_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Produce Request Topic");
- offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_produce_request_partition);
proto_item_set_len(ti, offset - start_offset);
@@ -710,18 +853,21 @@ dissect_kafka_produce_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre
static int
dissect_kafka_produce_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
{
+ proto_item *ti;
proto_tree *subtree;
+ kafka_packet_values_t packet_values;
+ memset(&packet_values, 0, sizeof(packet_values));
- subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_response_partition, NULL, "Produce Response Partition");
+ subtree = proto_tree_add_subtree(tree, tvb, offset, 14, ett_kafka_response_partition, &ti, "Produce Response Partition");
- proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
- offset += 4;
+ offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values);
- proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
- offset += 2;
+ offset = dissect_kafka_error(tvb, pinfo, subtree, offset);
- proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN);
- offset += 8;
+ offset = dissect_kafka_offset_get_value(tvb, pinfo, subtree, offset, &packet_values);
+
+ proto_item_append_text(ti, " (Partition-ID=%u, Offset=%" G_GINT64_MODIFIER "u)",
+ packet_values.partition_id, packet_values.offset);
return offset;
}
@@ -735,7 +881,7 @@ dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tr
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Produce Response Topic");
- offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_produce_response_partition);
proto_item_set_len(ti, offset - start_offset);
@@ -759,8 +905,7 @@ dissect_kafka_offset_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, pr
subtree = proto_tree_add_subtree(tree, tvb, offset, 16, ett_kafka_request_partition, &ti, "Offset Request Partition");
- proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
- offset += 4;
+ offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset);
proto_tree_add_item(subtree, hf_kafka_offset_time, tvb, offset, 8, ENC_BIG_ENDIAN);
offset += 8;
@@ -780,7 +925,7 @@ dissect_kafka_offset_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_request_topic, &ti, "Offset Request Topic");
- offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_request_partition);
proto_item_set_len(ti, offset - start_offset);
@@ -800,13 +945,6 @@ dissect_kafka_offset_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree
}
static int
-dissect_kafka_offset_response_offset(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
-{
- proto_tree_add_item(tree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN);
- return offset+8;
-}
-
-static int
dissect_kafka_offset_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset)
{
proto_item *ti;
@@ -815,13 +953,11 @@ dissect_kafka_offset_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, p
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_partition, &ti, "Offset Response Partition");
- proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
- offset += 4;
+ offset = dissect_kafka_partition_id(tvb, pinfo, subtree, offset);
- proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
- offset += 2;
+ offset = dissect_kafka_error(tvb, pinfo, subtree, offset);
- offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_response_offset);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset);
proto_item_set_len(ti, offset - start_offset);
@@ -837,7 +973,7 @@ dissect_kafka_offset_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tre
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_response_topic, &ti, "Offset Response Topic");
- offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset, NULL, NULL);
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_response_partition);
proto_item_set_len(ti, offset - start_offset);
@@ -856,7 +992,7 @@ dissect_kafka_offset_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre
static int
dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U_)
{
- proto_item *ti;
+ proto_item *root_ti, *ti;
proto_tree *kafka_tree;
int offset = 0;
kafka_query_response_t *matcher = NULL;
@@ -866,9 +1002,9 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
col_set_str(pinfo->cinfo, COL_PROTOCOL, "Kafka");
col_clear(pinfo->cinfo, COL_INFO);
- ti = proto_tree_add_item(tree, proto_kafka, tvb, 0, -1, ENC_NA);
+ root_ti = proto_tree_add_item(tree, proto_kafka, tvb, 0, -1, ENC_NA);
- kafka_tree = proto_item_add_subtree(ti, ett_kafka);
+ kafka_tree = proto_item_add_subtree(root_ti, ett_kafka);
proto_tree_add_item(kafka_tree, hf_kafka_len, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
@@ -884,9 +1020,8 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
matcher = (kafka_query_response_t *) p_get_proto_data(wmem_file_scope(), pinfo, proto_kafka, 0);
}
- if (pinfo->destport == kafka_port) {
+ if (value_is_in_range(current_kafka_tcp_range, pinfo->destport)) {
/* Request */
-
if (matcher == NULL) {
matcher = wmem_new(wmem_file_scope(), kafka_query_response_t);
@@ -909,6 +1044,9 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s Request",
val_to_str_const(matcher->api_key, kafka_apis, "Unknown"));
+ /* Also add to protocol root */
+ proto_item_append_text(root_ti, " (%s Request)",
+ val_to_str_const(matcher->api_key, kafka_apis, "Unknown"));
if (matcher->response_found) {
ti = proto_tree_add_uint(kafka_tree, hf_kafka_response_frame, tvb,
@@ -925,7 +1063,7 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
proto_tree_add_item(kafka_tree, hf_kafka_correlation_id, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
- offset = dissect_kafka_string(kafka_tree, hf_kafka_client_id, tvb, pinfo, offset);
+ offset = dissect_kafka_string(kafka_tree, hf_kafka_client_id, tvb, pinfo, offset, NULL, NULL);
switch (matcher->api_key) {
/* TODO: decode other request types */
@@ -977,6 +1115,11 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
col_add_fstr(pinfo->cinfo, COL_INFO, "Kafka %s Response",
val_to_str_const(matcher->api_key, kafka_apis, "Unknown"));
+ /* Also add to protocol root */
+ proto_item_append_text(root_ti, " (%s Response)",
+ val_to_str_const(matcher->api_key, kafka_apis, "Unknown"));
+
+
ti = proto_tree_add_uint(kafka_tree, hf_kafka_request_frame, tvb,
0, 0, matcher->request_frame);
@@ -1254,10 +1397,15 @@ proto_register_kafka(void)
kafka_module = prefs_register_protocol(proto_kafka,
proto_reg_handoff_kafka);
- /* Register an example port preference */
- prefs_register_uint_preference(kafka_module, "tcp.port", "Broker TCP Port",
- "Kafka broker's TCP port",
- 10, &kafka_port);
+ /* Preference for list/range of TCP server ports */
+ range_convert_str(&new_kafka_tcp_range, TCP_DEFAULT_RANGE, 65535);
+ new_kafka_tcp_range = range_empty();
+ prefs_register_range_preference(kafka_module, "tcp.ports", "Broker TCP Ports",
+ "TCP Ports range",
+ &new_kafka_tcp_range, 65535);
+
+ /* Single-port preference no longer in use */
+ prefs_register_obsolete_preference(kafka_module, "tcp.port");
}
void
@@ -1265,20 +1413,18 @@ proto_reg_handoff_kafka(void)
{
static gboolean initialized = FALSE;
static dissector_handle_t kafka_handle;
- static int currentPort;
if (!initialized) {
kafka_handle = create_dissector_handle(dissect_kafka_tcp,
proto_kafka);
initialized = TRUE;
-
- } else {
- dissector_delete_uint("tcp.port", currentPort, kafka_handle);
}
- currentPort = kafka_port;
-
- dissector_add_uint("tcp.port", currentPort, kafka_handle);
+ /* Replace range of ports with current */
+ dissector_delete_uint_range("tcp.port", current_kafka_tcp_range, kafka_handle);
+ g_free(current_kafka_tcp_range);
+ current_kafka_tcp_range = range_copy(new_kafka_tcp_range);
+ dissector_add_uint_range("tcp.port", new_kafka_tcp_range, kafka_handle);
}
/*