aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPascal Quantin <pascal.quantin@gmail.com>2017-05-09 21:21:04 +0100
committerMichael Mann <mmann78@netscape.net>2017-05-13 11:54:31 +0000
commit8112b8d7a83536193c75a7fd1e51d3af8b1c8c01 (patch)
tree24c100ce8533f00fe6a32c34ba2f7c3c59234888
parent2c9b07a8b6b464853635574a02f159e96834786a (diff)
Kafka: add LZ4 decompression
Change-Id: Idf2f63782c8751778ad88f46a7f65fe7d5d49f3b Reviewed-on: https://code.wireshark.org/review/21577 Reviewed-by: Pascal Quantin <pascal.quantin@gmail.com> Petri-Dish: Pascal Quantin <pascal.quantin@gmail.com> Tested-by: Petri Dish Buildbot <buildbot-no-reply@wireshark.org> Reviewed-by: Michael Mann <mmann78@netscape.net>
-rw-r--r--epan/dissectors/packet-kafka.c212
1 files changed, 211 insertions, 1 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c
index bcaeceb344..9079ee552b 100644
--- a/epan/dissectors/packet-kafka.c
+++ b/epan/dissectors/packet-kafka.c
@@ -33,6 +33,9 @@
#ifdef HAVE_SNAPPY
#include <snappy-c.h>
#endif
+#ifdef HAVE_LZ4
+#include <lz4frame.h>
+#endif
#include "packet-tcp.h"
void proto_register_kafka(void);
@@ -351,6 +354,121 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
/* HELPERS */
+#ifdef HAVE_LZ4
+/* Local copy of XXH32() algorithm as found in https://github.com/lz4/lz4/blob/v1.7.5/lib/xxhash.c
+ as some packagers are not providing xxhash.h in liblz4 */
+typedef struct {
+ guint32 total_len_32;
+ guint32 large_len;
+ guint32 v1;
+ guint32 v2;
+ guint32 v3;
+ guint32 v4;
+ guint32 mem32[4]; /* buffer defined as U32 for alignment */
+ guint32 memsize;
+ guint32 reserved; /* never read nor write, will be removed in a future version */
+} XXH32_state_t;
+
+typedef enum {
+ XXH_bigEndian=0,
+ XXH_littleEndian=1
+} XXH_endianess;
+
+static const int g_one = 1;
+#define XXH_CPU_LITTLE_ENDIAN (*(const char*)(&g_one))
+
+static const guint32 PRIME32_1 = 2654435761U;
+static const guint32 PRIME32_2 = 2246822519U;
+static const guint32 PRIME32_3 = 3266489917U;
+static const guint32 PRIME32_4 = 668265263U;
+static const guint32 PRIME32_5 = 374761393U;
+
+#define XXH_rotl32(x,r) ((x << r) | (x >> (32 - r)))
+
+static guint32 XXH_read32(const void* memPtr)
+{
+ guint32 val;
+ memcpy(&val, memPtr, sizeof(val));
+ return val;
+}
+
+static guint32 XXH_swap32(guint32 x)
+{
+ return ((x << 24) & 0xff000000 ) |
+ ((x << 8) & 0x00ff0000 ) |
+ ((x >> 8) & 0x0000ff00 ) |
+ ((x >> 24) & 0x000000ff );
+}
+
+#define XXH_readLE32(ptr, endian) (endian==XXH_littleEndian ? XXH_read32(ptr) : XXH_swap32(XXH_read32(ptr)))
+
+static guint32 XXH32_round(guint32 seed, guint32 input)
+{
+ seed += input * PRIME32_2;
+ seed = XXH_rotl32(seed, 13);
+ seed *= PRIME32_1;
+ return seed;
+}
+
+static guint32 XXH32_endian(const void* input, size_t len, guint32 seed, XXH_endianess endian)
+{
+ const gint8* p = (const gint8*)input;
+ const gint8* bEnd = p + len;
+ guint32 h32;
+#define XXH_get32bits(p) XXH_readLE32(p, endian)
+
+ if (len>=16) {
+ const gint8* const limit = bEnd - 16;
+ guint32 v1 = seed + PRIME32_1 + PRIME32_2;
+ guint32 v2 = seed + PRIME32_2;
+ guint32 v3 = seed + 0;
+ guint32 v4 = seed - PRIME32_1;
+
+ do {
+ v1 = XXH32_round(v1, XXH_get32bits(p)); p+=4;
+ v2 = XXH32_round(v2, XXH_get32bits(p)); p+=4;
+ v3 = XXH32_round(v3, XXH_get32bits(p)); p+=4;
+ v4 = XXH32_round(v4, XXH_get32bits(p)); p+=4;
+ } while (p<=limit);
+
+ h32 = XXH_rotl32(v1, 1) + XXH_rotl32(v2, 7) + XXH_rotl32(v3, 12) + XXH_rotl32(v4, 18);
+ } else {
+ h32 = seed + PRIME32_5;
+ }
+
+ h32 += (guint32) len;
+
+ while (p+4<=bEnd) {
+ h32 += XXH_get32bits(p) * PRIME32_3;
+ h32 = XXH_rotl32(h32, 17) * PRIME32_4 ;
+ p+=4;
+ }
+
+ while (p<bEnd) {
+ h32 += (*p) * PRIME32_5;
+ h32 = XXH_rotl32(h32, 11) * PRIME32_1 ;
+ p++;
+ }
+
+ h32 ^= h32 >> 15;
+ h32 *= PRIME32_2;
+ h32 ^= h32 >> 13;
+ h32 *= PRIME32_3;
+ h32 ^= h32 >> 16;
+
+ return h32;
+}
+
+static guint XXH32(const void* input, size_t len, guint seed)
+{
+ XXH_endianess endian_detected = (XXH_endianess)XXH_CPU_LITTLE_ENDIAN;
+ if (endian_detected==XXH_littleEndian)
+ return XXH32_endian(input, len, seed, XXH_littleEndian);
+ else
+ return XXH32_endian(input, len, seed, XXH_bigEndian);
+}
+#endif
+
static const char *
kafka_error_to_str(kafka_error_t error)
{
@@ -710,7 +828,99 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
break;
#endif
case KAFKA_MESSAGE_CODEC_LZ4:
- /* TODO: */
+#ifdef HAVE_LZ4
+ raw = kafka_get_bytes(subtree, tvb, pinfo, offset);
+ offset += 4;
+ if (raw) {
+ LZ4F_decompressionContext_t lz4_ctxt;
+ LZ4F_frameInfo_t lz4_info;
+ LZ4F_errorCode_t ret;
+ LZ4F_decompressOptions_t dec_opts = {0};
+ size_t src_offset, src_size, dst_size;
+ guchar *decompressed_buffer = NULL;
+
+ /* Prepare compressed data buffer */
+ guint compressed_size = tvb_reported_length(raw);
+ guint8 *data = (guint8*)tvb_memdup(wmem_packet_scope(), raw, 0, compressed_size);
+ /* Override header checksum to workaround buggy Kafka implementations */
+ if (compressed_size > 7) {
+ guint hdr_end = 6;
+ if (data[4] & 0x08) {
+ hdr_end += 8;
+ }
+ if (hdr_end < compressed_size) {
+ data[hdr_end] = (XXH32(&data[4], hdr_end - 4, 0) >> 8) & 0xff;
+ }
+ }
+
+ /* Show raw compressed data */
+ proto_tree_add_item(subtree, hf_kafka_message_value_compressed, tvb, offset, compressed_size, ENC_NA);
+
+ /* Allocate output buffer */
+ ret = LZ4F_createDecompressionContext(&lz4_ctxt, LZ4F_VERSION);
+ if (LZ4F_isError(ret)) {
+ goto fail;
+ }
+ src_offset = compressed_size;
+ ret = LZ4F_getFrameInfo(lz4_ctxt, &lz4_info, data, &src_offset);
+ if (LZ4F_isError(ret)) {
+ LZ4F_freeDecompressionContext(lz4_ctxt);
+ goto fail;
+ }
+ switch (lz4_info.blockSizeID) {
+ case LZ4F_max64KB:
+ dst_size = 1 << 16;
+ break;
+ case LZ4F_max256KB:
+ dst_size = 1 << 18;
+ break;
+ case LZ4F_max1MB:
+ dst_size = 1 << 20;
+ break;
+ case LZ4F_max4MB:
+ dst_size = 1 << 22;
+ break;
+ default:
+ LZ4F_freeDecompressionContext(lz4_ctxt);
+ goto fail;
+ }
+ if (lz4_info.contentSize && lz4_info.contentSize < dst_size) {
+ dst_size = (size_t)lz4_info.contentSize;
+ }
+ decompressed_buffer = (guchar*)wmem_alloc(pinfo->pool, dst_size);
+
+ /* Attempt the decompression. */
+ src_size = compressed_size - src_offset;
+ ret = LZ4F_decompress(lz4_ctxt, decompressed_buffer, &dst_size,
+ &data[src_offset], &src_size, &dec_opts);
+ LZ4F_freeDecompressionContext(lz4_ctxt);
+ if (ret == 0) {
+ size_t uncompressed_size = dst_size;
+
+ show_compression_reduction(tvb, subtree, compressed_size, (guint)uncompressed_size);
+
+ /* Add as separate data tab */
+ payload = tvb_new_child_real_data(tvb, decompressed_buffer,
+ (guint32)uncompressed_size, (guint32)uncompressed_size);
+ add_new_data_source(pinfo, payload, "Uncompressed Message");
+
+ /* Dissect as a message set */
+ dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE, codec);
+
+ /* Add to summary */
+ col_append_fstr(pinfo->cinfo, COL_INFO, " [LZ4-compressed message set]");
+ proto_item_append_text(message_ti, " (LZ4-compressed message set)");
+ } else {
+ fail:
+ /* Error */
+ decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA);
+ expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress);
+ }
+ offset += compressed_size;
+ }
+ break;
+#endif
+
case KAFKA_MESSAGE_CODEC_NONE:
default:
offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length);