我很好奇,我想检查snappy压缩在javakafka客户机上是否工作良好。
为了处理这个问题,我建立了一个小程序。这个程序生成1024条消息和可读数据。它们的大小是1024字节。我在新主题树上发送这些消息,然后直接在代理文件系统上检查这些主题的大小。
您可以通过以下代码找到此程序:
package unit_test.testCompress;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* Can be use in order to execute some unit test on compression
*/
public class TestCompress {
public static void compress(String type, String version){
Map<String,Object> configs = new HashMap<String,Object>();
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("producer.type", "async");
configs.put("compression.type", type);
configs.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
configs.put("partitioner.class", "com.kafkaproducer.RecordPartitioner");
configs.put("bootstrap.servers", "kafka:9092");
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(configs);
Random r = new Random(15415485);
int size = 1024; //1 Ko
byte[] buffer = new byte[size];
for(int i = 0; i < size; i++){
buffer[i] = (byte) ('A' + (r.nextInt() % 26));
}
buffer[size-1] = 0;
//System.out.println(new String(buffer));
for(int i = 0; i < size; i++ ){
Future<RecordMetadata> result = producer.send( new ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" + type , buffer));
}
producer.close();
}
public static void main(String[] args) {
String version = "v10";
compress("snappy",version);
compress("gzip",version);
compress("none",version);
}
}
我使用以下maven pom文件编译此代码:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>unit_test</groupId>
<artifactId>testCompress</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>testCompress</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
</dependencies>
</project>
这个程序在我的电脑上执行得很好。
但是,当我直接在我的kafka代理上检查结果时,它会给出以下输出:
我认为这意味着在snappy主题上没有压缩(但是gzip压缩工作得非常好)。我查过文件了
我知道Kafka8.2.1上有这个问题:https://issues.apache.org/jira/browse/kafka-2189 但我在producer上使用kafka8.2.2,在broker上使用8.2.1。我也检查了snappy的依赖性。我用的是1.1.1.7
你知道如何在kafak上实现快速压缩吗?我忘了在Kafka上启用快速压缩的参数了吗?
1条答案
按热度按时间bis0qfac1#
在kafka ml上交换之后,问题是我的kafka代理必须升级到8.2.2版本。它解决了我的问题。