diff options
author | Evan Huus <eapache@gmail.com> | 2013-08-09 15:37:07 +0000 |
---|---|---|
committer | Evan Huus <eapache@gmail.com> | 2013-08-09 15:37:07 +0000 |
commit | c4befc8a7ada449fa173dc61f57b5f539dc5e93b (patch) | |
tree | e78062175f172daf4c98e69f2a535d4f04951af6 | |
parent | 41f444ef87f194fad039b259e7aba0b53895aef0 (diff) |
Decode Kafka metadata requests and responses.
svn path=/trunk/; revision=51239
-rw-r--r-- | epan/dissectors/packet-kafka.c | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c index 888c0a0c42..b20cd3ca69 100644 --- a/epan/dissectors/packet-kafka.c +++ b/epan/dissectors/packet-kafka.c @@ -50,6 +50,9 @@ static int hf_kafka_required_acks = -1; static int hf_kafka_timeout = -1; static int hf_kafka_topic_name = -1; static int hf_kafka_partition_id = -1; +static int hf_kafka_replica = -1; +static int hf_kafka_isr = -1; +static int hf_kafka_partition_leader = -1; static int hf_kafka_message_set_size = -1; static int hf_kafka_request_frame = -1; static int hf_kafka_response_frame = -1; @@ -57,8 +60,16 @@ static int hf_kafka_consumer_group = -1; static int hf_kafka_offset = -1; static int hf_kafka_metadata = -1; static int hf_kafka_error = -1; +static int hf_kafka_broker_nodeid = -1; +static int hf_kafka_broker_host = -1; +static int hf_kafka_broker_port = -1; static gint ett_kafka = -1; +static gint ett_kafka_metadata_replicas = -1; +static gint ett_kafka_metadata_isr = -1; +static gint ett_kafka_metadata_broker = -1; +static gint ett_kafka_metadata_brokers = -1; +static gint ett_kafka_metadata_topics = -1; static gint ett_kafka_request_topic = -1; static gint ett_kafka_request_partition = -1; static gint ett_kafka_response_topic = -1; @@ -253,6 +264,150 @@ dissect_kafka_offset_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tre return offset; } +/* METADATA REQUEST */ + +static int +dissect_kafka_metadata_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +{ + return dissect_kafka_string(tree, hf_kafka_topic_name, tvb, pinfo, offset); +} + +static int +dissect_kafka_metadata_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +{ + proto_item *ti; + proto_tree *subtree; + int offset = start_offset; + + ti = proto_tree_add_text(tree, tvb, offset, -1, "Metadata Request"); + subtree = proto_item_add_subtree(ti, ett_kafka_request_topic); + + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_request_topic); + + proto_item_set_len(ti, offset - start_offset); + + return offset; +} + +/* METADATA RESPONSE */ + +static int +dissect_kafka_metadata_broker(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +{ + proto_item *ti; + proto_tree *subtree; + int offset = start_offset; + + ti = proto_tree_add_text(tree, tvb, offset, 14, "Broker"); + subtree = proto_item_add_subtree(ti, ett_kafka_metadata_broker); + + proto_tree_add_item(subtree, hf_kafka_broker_nodeid, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + offset = dissect_kafka_string(subtree, hf_kafka_broker_host, tvb, pinfo, offset); + + proto_tree_add_item(subtree, hf_kafka_broker_port, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + proto_item_set_len(ti, offset - start_offset); + + return offset; +} + +static int +dissect_kafka_metadata_replica(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +{ + proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN); + return offset + 4; +} + +static int +dissect_kafka_metadata_isr(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset) +{ + proto_tree_add_item(tree, hf_kafka_isr, tvb, offset, 4, ENC_BIG_ENDIAN); + return offset + 4; +} + +static int +dissect_kafka_metadata_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset) +{ + proto_item *ti, *subti; + proto_tree *subtree, *subsubtree; + int offset = start_offset; + int sub_start_offset; + + ti = proto_tree_add_text(tree, tvb, offset, -1, "Partition"); + subtree = proto_item_add_subtree(ti, ett_kafka_response_partition); + + proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN); + offset += 2; + + proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + proto_tree_add_item(subtree, hf_kafka_partition_leader, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + + sub_start_offset = offset; + subti = proto_tree_add_text(subtree, tvb, offset, -1, "Replicas"); + subsubtree = proto_item_add_subtree(subti, ett_kafka_metadata_replicas); + offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, &dissect_kafka_metadata_replica); + proto_item_set_len(subti, offset - sub_start_offset); + + sub_start_offset = offset; + subti = proto_tree_add_text(subtree, tvb, offset, -1, "Caught-Up Replicas"); + subsubtree = proto_item_add_subtree(subti, ett_kafka_metadata_isr); + offset = dissect_kafka_array(subsubtree, tvb, pinfo, offset, &dissect_kafka_metadata_isr); + proto_item_set_len(subti, offset - sub_start_offset); + + proto_item_set_len(ti, offset - start_offset); + + return offset; +} + +static int +dissect_kafka_metadata_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +{ + proto_item *ti; + proto_tree *subtree; + int offset = start_offset; + + ti = proto_tree_add_text(tree, tvb, offset, -1, "Topic"); + subtree = proto_item_add_subtree(ti, ett_kafka_response_topic); + + proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN); + offset += 2; + + offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset); + + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_partition); + + proto_item_set_len(ti, offset - start_offset); + + return offset; +} + +static int +dissect_kafka_metadata_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) +{ + proto_item *ti; + proto_tree *subtree; + int offset = start_offset; + + ti = proto_tree_add_text(tree, tvb, offset, -1, "Broker Metadata"); + subtree = proto_item_add_subtree(ti, ett_kafka_metadata_brokers); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_broker); + proto_item_set_len(ti, offset - start_offset); + + start_offset = offset; + ti = proto_tree_add_text(tree, tvb, offset, -1, "Topic Metadata"); + subtree = proto_item_add_subtree(ti, ett_kafka_request_topic); + offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_metadata_topic); + proto_item_set_len(ti, offset - start_offset); + + return offset; +} + /* PRODUCE REQUEST */ static int @@ -437,6 +592,9 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) case KAFKA_OFFSET_FETCH: offset = dissect_kafka_offset_fetch_request(tvb, pinfo, kafka_tree, offset); break; + case KAFKA_METADATA: + offset = dissect_kafka_metadata_request(tvb, pinfo, kafka_tree, offset); + break; } } else { @@ -479,6 +637,9 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) case KAFKA_OFFSET_FETCH: offset = dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset); break; + case KAFKA_METADATA: + offset = dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset); + break; } } @@ -576,6 +737,21 @@ proto_register_kafka(void) FT_INT32, BASE_DEC, 0, 0, NULL, HFILL } }, + { &hf_kafka_replica, + { "Replica ID", "kafka.replica_id", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_isr, + { "Caught-Up Replica ID", "kafka.isr_id", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_partition_leader, + { "Leader", "kafka.leader", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, { &hf_kafka_message_set_size, { "Message Set Size", "kafka.message_set_size", FT_INT32, BASE_DEC, 0, 0, @@ -591,6 +767,21 @@ proto_register_kafka(void) FT_FRAMENUM, BASE_NONE, 0, 0, NULL, HFILL } }, + { &hf_kafka_broker_nodeid, + { "Node ID", "kafka.node_id", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_broker_host, + { "Host", "kafka.host", + FT_STRING, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_broker_port, + { "Port", "kafka.port", + FT_INT32, BASE_DEC, 0, 0, + NULL, HFILL } + }, { &hf_kafka_response_frame, { "Response Frame", "kafka.reponse_frame", FT_FRAMENUM, BASE_NONE, 0, 0, @@ -600,6 +791,11 @@ proto_register_kafka(void) static gint *ett[] = { &ett_kafka, + &ett_kafka_metadata_isr, + &ett_kafka_metadata_replicas, + &ett_kafka_metadata_broker, + &ett_kafka_metadata_brokers, + &ett_kafka_metadata_topics, &ett_kafka_request_topic, &ett_kafka_request_partition, &ett_kafka_response_topic, |