diff options
author | Pascal Quantin <pascal.quantin@gmail.com> | 2016-10-18 22:42:09 +0200 |
---|---|---|
committer | Pascal Quantin <pascal.quantin@gmail.com> | 2016-10-19 12:55:17 +0000 |
commit | dbb391a64681e3c5722f8e7ff79bd4154d0b2e6b (patch) | |
tree | 2ebd7fe8a43448d84f901753e3d900c488559a2f /epan | |
parent | 25122f5ef6f722a12077ce91f1b0d9784570009c (diff) |
Kafka: add Snappy decompression support
Change-Id: Ida8d941809a4e0f2fd4d9f142363187a757d0278
Reviewed-on: https://code.wireshark.org/review/18288
Reviewed-by: Pascal Quantin <pascal.quantin@gmail.com>
Reviewed-by: Martin Mathieson <martin.r.mathieson@googlemail.com>
Diffstat (limited to 'epan')
-rw-r--r-- | epan/dissectors/packet-kafka.c | 69 |
1 files changed, 67 insertions, 2 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c index 0f41a532ac..241f998a06 100644 --- a/epan/dissectors/packet-kafka.c +++ b/epan/dissectors/packet-kafka.c @@ -30,6 +30,9 @@ #include <epan/prefs.h> #include <epan/prefs-int.h> #include <epan/proto_data.h> +#ifdef HAVE_SNAPPY +#include <snappy-c.h> +#endif #include "packet-tcp.h" void proto_register_kafka(void); @@ -182,6 +185,9 @@ static const value_string kafka_codecs[] = { { KAFKA_COMPRESSION_SNAPPY, "Snappy" }, { 0, NULL } }; +#ifdef HAVE_SNAPPY +static const guint8 kafka_xerial_header[8] = {0x82, 0x53, 0x4e, 0x41, 0x50, 0x50, 0x59, 0x00}; +#endif /* List/range of TCP ports to register */ static range_t *current_kafka_tcp_range = NULL; @@ -346,7 +352,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s switch (codec) { case KAFKA_COMPRESSION_GZIP: - raw = kafka_get_bytes(tree, tvb, pinfo, offset); + raw = kafka_get_bytes(subtree, tvb, pinfo, offset); offset += 4; if (raw) { @@ -370,7 +376,66 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s break; case KAFKA_COMPRESSION_SNAPPY: - /* We can't uncompress snappy yet... */ +#ifdef HAVE_SNAPPY + raw = kafka_get_bytes(subtree, tvb, pinfo, offset); + offset += 4; + if (raw) { + guint compressed_size = tvb_reported_length(raw); + guint8 *data = (guint8*)tvb_memdup(wmem_packet_scope(), raw, 0, compressed_size); + size_t uncompressed_size; + snappy_status ret = SNAPPY_INVALID_INPUT; + + if (tvb_memeql(raw, 0, kafka_xerial_header, sizeof(kafka_xerial_header)) == 0) { + /* xerial framing format */ + guint chunk_size, pos = 16; + + payload = tvb_new_composite(); + while (pos < compressed_size) { + chunk_size = tvb_get_ntohl(raw, pos); + pos += 4; + ret = snappy_uncompressed_length(&data[pos], chunk_size, &uncompressed_size); + if (ret == SNAPPY_OK) { + guint8 *decompressed_buffer = (guint8*)wmem_alloc(pinfo->pool, uncompressed_size); + + ret = snappy_uncompress(&data[pos], chunk_size, decompressed_buffer, &uncompressed_size); + if (ret == SNAPPY_OK) { + tvb_composite_append(payload, + tvb_new_child_real_data(tvb, decompressed_buffer, + (guint32)uncompressed_size, (guint32)uncompressed_size)); + } else { + wmem_free(pinfo->pool, decompressed_buffer); + break; + } + } + pos += chunk_size; + } + tvb_composite_finalize(payload); + } else { + /* unframed format */ + ret = snappy_uncompressed_length(data, compressed_size, &uncompressed_size); + if (ret == SNAPPY_OK) { + guint8 *decompressed_buffer = (guint8*)wmem_alloc(pinfo->pool, uncompressed_size); + + ret = snappy_uncompress(data, compressed_size, decompressed_buffer, &uncompressed_size); + if (ret == SNAPPY_OK) { + payload = tvb_new_child_real_data(tvb, decompressed_buffer, + (guint32)uncompressed_size, (guint32)uncompressed_size); + } else { + wmem_free(pinfo->pool, decompressed_buffer); + } + } + } + if (ret == SNAPPY_OK) { + add_new_data_source(pinfo, payload, "Uncompressed Message"); + dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE); + } else { + decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA); + expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress); + } + offset += tvb_captured_length(raw); + } + break; +#endif case KAFKA_COMPRESSION_NONE: default: offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length); |