aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvan Huus <eapache@gmail.com>2013-08-09 15:37:07 +0000
committerEvan Huus <eapache@gmail.com>2013-08-09 15:37:07 +0000
commitc4befc8a7ada449fa173dc61f57b5f539dc5e93b (patch)
treee78062175f172daf4c98e69f2a535d4f04951af6
parent41f444ef87f194fad039b259e7aba0b53895aef0 (diff)
Decode Kafka metadata requests and responses.
svn path=/trunk/; revision=51239
-rw-r--r--epan/dissectors/packet-kafka.c196
1 files changed, 196 insertions, 0 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c
index 888c0a0c42..b20cd3ca69 100644
--- a/epan/dissectors/packet-kafka.c
+++ b/epan/dissectors/packet-kafka.c
@@ -50,6 +50,9 @@ static int hf_kafka_required_acks = -1;
static int hf_kafka_timeout = -1;
static int hf_kafka_topic_name = -1;
static int hf_kafka_partition_id = -1;
+static int hf_kafka_replica = -1;
+static int hf_kafka_isr = -1;
+static int hf_kafka_partition_leader = -1;
static int hf_kafka_message_set_size = -1;
static int hf_kafka_request_frame = -1;
static int hf_kafka_response_frame = -1;
@@ -57,8 +60,16 @@ static int hf_kafka_consumer_group = -1;
static int hf_kafka_offset = -1;
static int hf_kafka_metadata = -1;
static int hf_kafka_error = -1;
+static int hf_kafka_broker_nodeid = -1;
+static int hf_kafka_broker_host = -1;
+static int hf_kafka_broker_port = -1;
static gint ett_kafka = -1;
+static gint ett_kafka_metadata_replicas = -1;
+static gint ett_kafka_metadata_isr = -1;
+static gint ett_kafka_metadata_broker = -1;
+static gint ett_kafka_metadata_brokers = -1;
+static gint ett_kafka_metadata_topics = -1;
static gint ett_kafka_request_topic = -1;
static gint ett_kafka_request_partition = -1;
static gint ett_kafka_response_topic = -1;
@@ -253,6 +264,150 @@ dissect_kafka_offset_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tre
return offset;
}
+/* METADATA REQUEST */
+
+static int
+dissect_kafka_metadata_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+{
+ return dissect_kafka_string(tree, hf_kafka_topic_name, tvb, pinfo, offset);
+}
+
+static int
+dissect_kafka_metadata_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+{
+ proto_item *ti;
+ proto_tree *subtree;
+ int offset = start_offset;
+
+ ti = proto_tree_add_text(tree, tvb, offset, -1, "Metadata Request");
+ subtree = proto_item_add_subtree(ti, ett_kafka_request_topic);
+
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_request_topic);
+
+ proto_item_set_len(ti, offset - start_offset);
+
+ return offset;
+}
+
+/* METADATA RESPONSE */
+
+static int
+dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+{
+ proto_item *ti;
+ proto_tree *subtree;
+ int offset = start_offset;
+
+ ti = proto_tree_add_text(tree, tvb, offset, 14, "Broker");
+ subtree = proto_item_add_subtree(ti, ett_kafka_metadata_broker);
+
+ proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset);
+
+ proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ proto_item_set_len(ti, offset - start_offset);
+
+ return offset;
+}
+
+static int
+dissect_kafka_metadata_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+{
+ proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN);
+ return offset + 4;
+}
+
+static int
+dissect_kafka_metadata_isr(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
+{
+ proto_tree_add_item(tree, hf_kafka_isr, tvb, offset, 4, ENC_BIG_ENDIAN);
+ return offset + 4;
+}
+
+static int
+dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset)
+{
+ proto_item *ti, *subti;
+ proto_tree *subtree, *subsubtree;
+ int offset = start_offset;
+ int sub_start_offset;
+
+ ti = proto_tree_add_text(tree, tvb, offset, -1, "Partition");
+ subtree = proto_item_add_subtree(ti, ett_kafka_response_partition);
+
+ proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
+ offset += 2;
+
+ proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ proto_tree_add_item(subtree, hf_kafka_partition_leader, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+
+ sub_start_offset = offset;
+ subti = proto_tree_add_text(subtree, tvb, offset, -1, "Replicas");
+ subsubtree = proto_item_add_subtree(subti, ett_kafka_metadata_replicas);
+ offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, &dissect_kafka_metadata_replica);
+ proto_item_set_len(subti, offset - sub_start_offset);
+
+ sub_start_offset = offset;
+ subti = proto_tree_add_text(subtree, tvb, offset, -1, "Caught-Up Replicas");
+ subsubtree = proto_item_add_subtree(subti, ett_kafka_metadata_isr);
+ offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, &dissect_kafka_metadata_isr);
+ proto_item_set_len(subti, offset - sub_start_offset);
+
+ proto_item_set_len(ti, offset - start_offset);
+
+ return offset;
+}
+
+static int
+dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+{
+ proto_item *ti;
+ proto_tree *subtree;
+ int offset = start_offset;
+
+ ti = proto_tree_add_text(tree, tvb, offset, -1, "Topic");
+ subtree = proto_item_add_subtree(ti, ett_kafka_response_topic);
+
+ proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
+ offset += 2;
+
+ offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
+
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_partition);
+
+ proto_item_set_len(ti, offset - start_offset);
+
+ return offset;
+}
+
+static int
+dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
+{
+ proto_item *ti;
+ proto_tree *subtree;
+ int offset = start_offset;
+
+ ti = proto_tree_add_text(tree, tvb, offset, -1, "Broker Metadata");
+ subtree = proto_item_add_subtree(ti, ett_kafka_metadata_brokers);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_broker);
+ proto_item_set_len(ti, offset - start_offset);
+
+ start_offset = offset;
+ ti = proto_tree_add_text(tree, tvb, offset, -1, "Topic Metadata");
+ subtree = proto_item_add_subtree(ti, ett_kafka_request_topic);
+ offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_topic);
+ proto_item_set_len(ti, offset - start_offset);
+
+ return offset;
+}
+
/* PRODUCE REQUEST */
static int
@@ -437,6 +592,9 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree)
case KAFKA_OFFSET_FETCH:
offset = dissect_kafka_offset_fetch_request(tvb, pinfo, kafka_tree, offset);
break;
+ case KAFKA_METADATA:
+ offset = dissect_kafka_metadata_request(tvb, pinfo, kafka_tree, offset);
+ break;
}
}
else {
@@ -479,6 +637,9 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree)
case KAFKA_OFFSET_FETCH:
offset = dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset);
break;
+ case KAFKA_METADATA:
+ offset = dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset);
+ break;
}
}
@@ -576,6 +737,21 @@ proto_register_kafka(void)
FT_INT32, BASE_DEC, 0, 0,
NULL, HFILL }
},
+ { &hf_kafka_replica,
+ { "Replica ID", "kafka.replica_id",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_isr,
+ { "Caught-Up Replica ID", "kafka.isr_id",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_partition_leader,
+ { "Leader", "kafka.leader",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
{ &hf_kafka_message_set_size,
{ "Message Set Size", "kafka.message_set_size",
FT_INT32, BASE_DEC, 0, 0,
@@ -591,6 +767,21 @@ proto_register_kafka(void)
FT_FRAMENUM, BASE_NONE, 0, 0,
NULL, HFILL }
},
+ { &hf_kafka_broker_nodeid,
+ { "Node ID", "kafka.node_id",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_broker_host,
+ { "Host", "kafka.host",
+ FT_STRING, BASE_NONE, 0, 0,
+ NULL, HFILL }
+ },
+ { &hf_kafka_broker_port,
+ { "Port", "kafka.port",
+ FT_INT32, BASE_DEC, 0, 0,
+ NULL, HFILL }
+ },
{ &hf_kafka_response_frame,
{ "Response Frame", "kafka.reponse_frame",
FT_FRAMENUM, BASE_NONE, 0, 0,
@@ -600,6 +791,11 @@ proto_register_kafka(void)
static gint *ett[] = {
&ett_kafka,
+ &ett_kafka_metadata_isr,
+ &ett_kafka_metadata_replicas,
+ &ett_kafka_metadata_broker,
+ &ett_kafka_metadata_brokers,
+ &ett_kafka_metadata_topics,
&ett_kafka_request_topic,
&ett_kafka_request_partition,
&ett_kafka_response_topic,