aboutsummaryrefslogtreecommitdiffstats
path: root/epan
diff options
context:
space:
mode:
authorPascal Quantin <pascal.quantin@gmail.com>2016-10-18 22:42:09 +0200
committerPascal Quantin <pascal.quantin@gmail.com>2016-10-19 12:55:17 +0000
commitdbb391a64681e3c5722f8e7ff79bd4154d0b2e6b (patch)
tree2ebd7fe8a43448d84f901753e3d900c488559a2f /epan
parent25122f5ef6f722a12077ce91f1b0d9784570009c (diff)
Kafka: add Snappy decompression support
Change-Id: Ida8d941809a4e0f2fd4d9f142363187a757d0278 Reviewed-on: https://code.wireshark.org/review/18288 Reviewed-by: Pascal Quantin <pascal.quantin@gmail.com> Reviewed-by: Martin Mathieson <martin.r.mathieson@googlemail.com>
Diffstat (limited to 'epan')
-rw-r--r--epan/dissectors/packet-kafka.c69
1 files changed, 67 insertions, 2 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c
index 0f41a532ac..241f998a06 100644
--- a/epan/dissectors/packet-kafka.c
+++ b/epan/dissectors/packet-kafka.c
@@ -30,6 +30,9 @@
#include <epan/prefs.h>
#include <epan/prefs-int.h>
#include <epan/proto_data.h>
+#ifdef HAVE_SNAPPY
+#include <snappy-c.h>
+#endif
#include "packet-tcp.h"
void proto_register_kafka(void);
@@ -182,6 +185,9 @@ static const value_string kafka_codecs[] = {
{ KAFKA_COMPRESSION_SNAPPY, "Snappy" },
{ 0, NULL }
};
+#ifdef HAVE_SNAPPY
+static const guint8 kafka_xerial_header[8] = {0x82, 0x53, 0x4e, 0x41, 0x50, 0x50, 0x59, 0x00};
+#endif
/* List/range of TCP ports to register */
static range_t *current_kafka_tcp_range = NULL;
@@ -346,7 +352,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
switch (codec) {
case KAFKA_COMPRESSION_GZIP:
- raw = kafka_get_bytes(tree, tvb, pinfo, offset);
+ raw = kafka_get_bytes(subtree, tvb, pinfo, offset);
offset += 4;
if (raw) {
@@ -370,7 +376,66 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
break;
case KAFKA_COMPRESSION_SNAPPY:
- /* We can't uncompress snappy yet... */
+#ifdef HAVE_SNAPPY
+ raw = kafka_get_bytes(subtree, tvb, pinfo, offset);
+ offset += 4;
+ if (raw) {
+ guint compressed_size = tvb_reported_length(raw);
+ guint8 *data = (guint8*)tvb_memdup(wmem_packet_scope(), raw, 0, compressed_size);
+ size_t uncompressed_size;
+ snappy_status ret = SNAPPY_INVALID_INPUT;
+
+ if (tvb_memeql(raw, 0, kafka_xerial_header, sizeof(kafka_xerial_header)) == 0) {
+ /* xerial framing format */
+ guint chunk_size, pos = 16;
+
+ payload = tvb_new_composite();
+ while (pos < compressed_size) {
+ chunk_size = tvb_get_ntohl(raw, pos);
+ pos += 4;
+ ret = snappy_uncompressed_length(&data[pos], chunk_size, &uncompressed_size);
+ if (ret == SNAPPY_OK) {
+ guint8 *decompressed_buffer = (guint8*)wmem_alloc(pinfo->pool, uncompressed_size);
+
+ ret = snappy_uncompress(&data[pos], chunk_size, decompressed_buffer, &uncompressed_size);
+ if (ret == SNAPPY_OK) {
+ tvb_composite_append(payload,
+ tvb_new_child_real_data(tvb, decompressed_buffer,
+ (guint32)uncompressed_size, (guint32)uncompressed_size));
+ } else {
+ wmem_free(pinfo->pool, decompressed_buffer);
+ break;
+ }
+ }
+ pos += chunk_size;
+ }
+ tvb_composite_finalize(payload);
+ } else {
+ /* unframed format */
+ ret = snappy_uncompressed_length(data, compressed_size, &uncompressed_size);
+ if (ret == SNAPPY_OK) {
+ guint8 *decompressed_buffer = (guint8*)wmem_alloc(pinfo->pool, uncompressed_size);
+
+ ret = snappy_uncompress(data, compressed_size, decompressed_buffer, &uncompressed_size);
+ if (ret == SNAPPY_OK) {
+ payload = tvb_new_child_real_data(tvb, decompressed_buffer,
+ (guint32)uncompressed_size, (guint32)uncompressed_size);
+ } else {
+ wmem_free(pinfo->pool, decompressed_buffer);
+ }
+ }
+ }
+ if (ret == SNAPPY_OK) {
+ add_new_data_source(pinfo, payload, "Uncompressed Message");
+ dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE);
+ } else {
+ decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA);
+ expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress);
+ }
+ offset += tvb_captured_length(raw);
+ }
+ break;
+#endif
case KAFKA_COMPRESSION_NONE:
default:
offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length);