diff options
author | Martin Mathieson <martin.r.mathieson@googlemail.com> | 2016-06-08 05:26:55 -0700 |
---|---|---|
committer | Evan Huus <eapache@gmail.com> | 2016-06-08 13:33:06 +0000 |
commit | fec1061ce013cd4db451953f846e1c50231e0cbe (patch) | |
tree | 69a7fea0724b49cb395e247e01f4da47cfcb9bde /epan/dissectors/packet-kafka.c | |
parent | eb3781942ce482c24f773b5b758e6b32afe9ba1f (diff) |
Kafka: pass api_version to response and start to use it
Change-Id: Idc43e37d113e5b598cd5b1a8875ede335f9534de
Reviewed-on: https://code.wireshark.org/review/15776
Petri-Dish: Martin Mathieson <martin.r.mathieson@googlemail.com>
Tested-by: Petri Dish Buildbot <buildbot-no-reply@wireshark.org>
Reviewed-by: Evan Huus <eapache@gmail.com>
Diffstat (limited to 'epan/dissectors/packet-kafka.c')
-rw-r--r-- | epan/dissectors/packet-kafka.c | 55 |
1 files changed, 47 insertions, 8 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c index e2aff62f7e..a8136f62e6 100644 --- a/epan/dissectors/packet-kafka.c +++ b/epan/dissectors/packet-kafka.c @@ -39,6 +39,7 @@ static int hf_kafka_len = -1; static int hf_kafka_request_api_key = -1; static int hf_kafka_response_api_key = -1; static int hf_kafka_request_api_version = -1; +static int hf_kafka_response_api_version = -1; static int hf_kafka_correlation_id = -1; static int hf_kafka_client_id = -1; static int hf_kafka_string_len = -1; @@ -72,6 +73,7 @@ static int hf_kafka_broker_port = -1; static int hf_kafka_min_bytes = -1; static int hf_kafka_max_bytes = -1; static int hf_kafka_max_wait_time = -1; +static int hf_kafka_throttle_time = -1; static gint ett_kafka = -1; static gint ett_kafka_message = -1; @@ -191,6 +193,7 @@ static range_t *current_kafka_tcp_range = NULL; typedef struct _kafka_query_response_t { gint16 api_key; + guint16 api_version; guint32 request_frame; guint32 response_frame; gboolean response_found; @@ -812,8 +815,14 @@ dissect_kafka_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree } static int -dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, guint16 api_version) { + if (api_version > 0) { + /* Throttle time */ + proto_tree_add_item(tree, hf_kafka_throttle_time, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + } + return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_fetch_response_topic); } @@ -909,9 +918,17 @@ dissect_kafka_produce_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tr } static int -dissect_kafka_produce_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_produce_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, guint16 api_version) { - return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_produce_response_topic); + offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_produce_response_topic); + + if (api_version > 0) { + /* Throttle time */ + proto_tree_add_item(tree, hf_kafka_throttle_time, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + } + + return offset; } /* OFFSET REQUEST/RESPONSE */ @@ -1029,6 +1046,7 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U offset += 4; conversation = find_or_create_conversation(pinfo); + /* Create match_queue for this conversation */ match_queue = (wmem_queue_t *) conversation_get_proto_data(conversation, proto_kafka); if (match_queue == NULL) { match_queue = wmem_queue_new(wmem_file_scope()); @@ -1040,11 +1058,12 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U } if (value_is_in_range(current_kafka_tcp_range, pinfo->destport)) { - /* Request */ + /* Request (as directed towards server port) */ if (matcher == NULL) { matcher = wmem_new(wmem_file_scope(), kafka_query_response_t); matcher->api_key = tvb_get_ntohs(tvb, offset); + matcher->api_version = tvb_get_ntohs(tvb, offset+2); matcher->request_frame = pinfo->num; matcher->response_found = FALSE; @@ -1139,19 +1158,26 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U val_to_str_const(matcher->api_key, kafka_apis, "Unknown")); - + /* Show request frame */ ti = proto_tree_add_uint(kafka_tree, hf_kafka_request_frame, tvb, 0, 0, matcher->request_frame); PROTO_ITEM_SET_GENERATED(ti); + /* Show api key (message type) */ ti = proto_tree_add_int(kafka_tree, hf_kafka_response_api_key, tvb, 0, 0, matcher->api_key); PROTO_ITEM_SET_GENERATED(ti); + /* Also show api version from request */ + ti = proto_tree_add_int(kafka_tree, hf_kafka_response_api_version, tvb, + 0, 0, matcher->api_version); + PROTO_ITEM_SET_GENERATED(ti); + + switch (matcher->api_key) { /* TODO: decode other response types */ case KAFKA_PRODUCE: - /*offset =*/ dissect_kafka_produce_response(tvb, pinfo, kafka_tree, offset); + /*offset =*/ dissect_kafka_produce_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; case KAFKA_OFFSET_FETCH: /*offset =*/ dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset); @@ -1160,7 +1186,7 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U /*offset =*/ dissect_kafka_metadata_response(tvb, pinfo, kafka_tree, offset); break; case KAFKA_FETCH: - /*offset =*/ dissect_kafka_fetch_response(tvb, pinfo, kafka_tree, offset); + /*offset =*/ dissect_kafka_fetch_response(tvb, pinfo, kafka_tree, offset, matcher->api_version); break; case KAFKA_OFFSET: /*offset =*/ dissect_kafka_offset_response(tvb, pinfo, kafka_tree, offset); @@ -1229,10 +1255,15 @@ proto_register_kafka(void) "Response API.", HFILL } }, { &hf_kafka_request_api_version, - { "API Version", "kafka.version", + { "API Version", "kafka.request.version", FT_INT16, BASE_DEC, 0, 0, "Request API Version.", HFILL } }, + { &hf_kafka_response_api_version, + { "API Version", "kafka.response.version", + FT_INT16, BASE_DEC, 0, 0, + "Response API Version.", HFILL } + }, { &hf_kafka_correlation_id, { "Correlation ID", "kafka.correlation_id", FT_INT32, BASE_DEC, 0, 0, @@ -1375,6 +1406,14 @@ proto_register_kafka(void) " issued.", HFILL } }, + { &hf_kafka_throttle_time, + { "Throttle time", "kafka.throttle_time", + FT_INT32, BASE_DEC, 0, 0, + "Duration in milliseconds for which the request was throttled" + " due to quota violation." + " (Zero if the request did not violate any quota.)", + HFILL } + }, { &hf_kafka_response_frame, { "Response Frame", "kafka.reponse_frame", FT_FRAMENUM, BASE_NONE, FRAMENUM_TYPE(FT_FRAMENUM_RESPONSE), 0, |