aboutsummaryrefslogtreecommitdiffstats
path: root/epan/dissectors/packet-kafka.c
diff options
context:
space:
mode:
authorMartin Mathieson <martin.r.mathieson@googlemail.com>2016-06-08 05:26:55 -0700
committerEvan Huus <eapache@gmail.com>2016-06-08 13:33:06 +0000
commitfec1061ce013cd4db451953f846e1c50231e0cbe (patch)
tree69a7fea0724b49cb395e247e01f4da47cfcb9bde /epan/dissectors/packet-kafka.c
parenteb3781942ce482c24f773b5b758e6b32afe9ba1f (diff)
Kafka: pass api_version to response and start to use it
Change-Id: Idc43e37d113e5b598cd5b1a8875ede335f9534de Reviewed-on: https://code.wireshark.org/review/15776 Petri-Dish: Martin Mathieson <martin.r.mathieson@googlemail.com> Tested-by: Petri Dish Buildbot <buildbot-no-reply@wireshark.org> Reviewed-by: Evan Huus <eapache@gmail.com>
Diffstat (limited to 'epan/dissectors/packet-kafka.c')
-rw-r--r--epan/dissectors/packet-kafka.c55
1 files changed, 47 insertions, 8 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c
index e2aff62f7e..a8136f62e6 100644
--- a/epan/dissectors/packet-kafka.c
+++ b/epan/dissectors/packet-kafka.c
@@ -39,6 +39,7 @@ static int hf_kafka_len = -1;
static int hf_kafka_request_api_key = -1;
static int hf_kafka_response_api_key = -1;
static int hf_kafka_request_api_version = -1;
+static int hf_kafka_response_api_version = -1;
static int hf_kafka_correlation_id = -1;
static int hf_kafka_client_id = -1;
static int hf_kafka_string_len = -1;
@@ -72,6 +73,7 @@ static int hf_kafka_broker_port = -1;
static int hf_kafka_min_bytes = -1;
static int hf_kafka_max_bytes = -1;
static int hf_kafka_max_wait_time = -1;
+static int hf_kafka_throttle_time = -1;
static gint ett_kafka = -1;
static gint ett_kafka_message = -1;
@@ -191,6 +193,7 @@ static range_t *current_kafka_tcp_range = NULL;
typedef struct _kafka_query_response_t {
gint16 api_key;
+ guint16 api_version;
guint32 request_frame;
guint32 response_frame;
gboolean response_found;
@@ -812,8 +815,14 @@ dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree
}
static int
-dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, guint16 api_version)
{
+ if (api_version > 0) {
+ /* Throttle time */
+ proto_tree_add_item(tree, hf_kafka_throttle_time, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+ }
+
return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_fetch_response_topic);
}
@@ -909,9 +918,17 @@ dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tr
}
static int
-dissect_kafka_produce_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
+dissect_kafka_produce_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, guint16 api_version)
{
- return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_produce_response_topic);
+ offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_produce_response_topic);
+
+ if (api_version > 0) {
+ /* Throttle time */
+ proto_tree_add_item(tree, hf_kafka_throttle_time, tvb, offset, 4, ENC_BIG_ENDIAN);
+ offset += 4;
+ }
+
+ return offset;
}
/* OFFSET REQUEST/RESPONSE */
@@ -1029,6 +1046,7 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
offset += 4;
conversation = find_or_create_conversation(pinfo);
+ /* Create match_queue for this conversation */
match_queue = (wmem_queue_t *) conversation_get_proto_data(conversation, proto_kafka);
if (match_queue == NULL) {
match_queue = wmem_queue_new(wmem_file_scope());
@@ -1040,11 +1058,12 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
}
if (value_is_in_range(current_kafka_tcp_range, pinfo->destport)) {
- /* Request */
+ /* Request (as directed towards server port) */
if (matcher == NULL) {
matcher = wmem_new(wmem_file_scope(), kafka_query_response_t);
matcher->api_key = tvb_get_ntohs(tvb, offset);
+ matcher->api_version = tvb_get_ntohs(tvb, offset+2);
matcher->request_frame = pinfo->num;
matcher->response_found = FALSE;
@@ -1139,19 +1158,26 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
val_to_str_const(matcher->api_key, kafka_apis, "Unknown"));
-
+ /* Show request frame */
ti = proto_tree_add_uint(kafka_tree, hf_kafka_request_frame, tvb,
0, 0, matcher->request_frame);
PROTO_ITEM_SET_GENERATED(ti);
+ /* Show api key (message type) */
ti = proto_tree_add_int(kafka_tree, hf_kafka_response_api_key, tvb,
0, 0, matcher->api_key);
PROTO_ITEM_SET_GENERATED(ti);
+ /* Also show api version from request */
+ ti = proto_tree_add_int(kafka_tree, hf_kafka_response_api_version, tvb,
+ 0, 0, matcher->api_version);
+ PROTO_ITEM_SET_GENERATED(ti);
+
+
switch (matcher->api_key) {
/* TODO: decode other response types */
case KAFKA_PRODUCE:
- /*offset =*/ dissect_kafka_produce_response(tvb, pinfo, kafka_tree, offset);
+ /*offset =*/ dissect_kafka_produce_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
case KAFKA_OFFSET_FETCH:
/*offset =*/ dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset);
@@ -1160,7 +1186,7 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
/*offset =*/ dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset);
break;
case KAFKA_FETCH:
- /*offset =*/ dissect_kafka_fetch_response(tvb, pinfo, kafka_tree, offset);
+ /*offset =*/ dissect_kafka_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version);
break;
case KAFKA_OFFSET:
/*offset =*/ dissect_kafka_offset_response(tvb, pinfo, kafka_tree, offset);
@@ -1229,10 +1255,15 @@ proto_register_kafka(void)
"Response API.", HFILL }
},
{ &hf_kafka_request_api_version,
- { "API Version", "kafka.version",
+ { "API Version", "kafka.request.version",
FT_INT16, BASE_DEC, 0, 0,
"Request API Version.", HFILL }
},
+ { &hf_kafka_response_api_version,
+ { "API Version", "kafka.response.version",
+ FT_INT16, BASE_DEC, 0, 0,
+ "Response API Version.", HFILL }
+ },
{ &hf_kafka_correlation_id,
{ "Correlation ID", "kafka.correlation_id",
FT_INT32, BASE_DEC, 0, 0,
@@ -1375,6 +1406,14 @@ proto_register_kafka(void)
" issued.",
HFILL }
},
+ { &hf_kafka_throttle_time,
+ { "Throttle time", "kafka.throttle_time",
+ FT_INT32, BASE_DEC, 0, 0,
+ "Duration in milliseconds for which the request was throttled"
+ " due to quota violation."
+ " (Zero if the request did not violate any quota.)",
+ HFILL }
+ },
{ &hf_kafka_response_frame,
{ "Response Frame", "kafka.reponse_frame",
FT_FRAMENUM, BASE_NONE, FRAMENUM_TYPE(FT_FRAMENUM_RESPONSE), 0,