我让Kafka集群接收消息。消息是zip文件的字节数组。zip文件包含二进制protobuf数据文件作为条目。我正在读取zip文件,并试图反序列化protobuf条目,这就是我的代码所处的位置 "protocol message has invalid UTF-8,invalid tag"
例外情况。
在将二进制protobuf文件作为压缩字节数组发送到代理之前,我能够对其进行反序列化。
但是当我压缩这些二进制protobuf文件,生成消息给kafka,使用它,然后尝试反序列化zip流中的条目时,我面临着一些问题。
我不确定谁是罪魁祸首。
既然这些二进制协议缓冲区是gzip压缩的,那么再次压缩它们会把事情搞砸吗?
有人能给我点启示吗。
谢谢
编辑
Producer Side:
public byte[] getZipfileBytes() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZipOutputStream zipOut = new ZipOutputStream(baos);
CheckedOutputStream checkSum = new CheckedOutputStream(zipOut, new Adler32());
try {
ZipEntry zipEntry = new ZipEntry(testFile.getName());
byte[] protoBytes = IOUtils.toByteArray(new FileInputStream(testFile));
System.out.println("bytes length:\t"+protoBytes.length);
zipEntry.setSize(protoBytes.length);
zipOut.putNextEntry(zipEntry);
zipOut.write(protoBytes);
zipOut.close();
System.out.println("checksum:"+checkSum.getChecksum().getValue());
zipBytes = baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return zipBytes;
}
Consumer Side:
processConsumerRecord(ConsumerRecord<String, byte[]> record) {
String key = record.key();
byte[] dataPacket = record.value();
ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(dataPacket));
CheckedInputStream checkSum = new CheckedInputStream(zipIn,
new Adler32());
ZipEntry zipEntry;
try {
zipEntry = zipIn.getNextEntry();
while (zipEntry != null) {
String name = zipEntry.getName();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
IOUtils.copy(zipIn, baos);
byte[] protoBytes = baos.toByteArray();
二进制protobuf字节是gzip的,所以我需要gunzip
如果我做gunzip,它抛出的不是gzip格式。
如果我跳过gunzip并执行parsefrom,就会得到无效的标记异常。
GZIPInputStream gzip = new GZIPInputStream(
new ByteArrayInputStream(baos.toByteArray()));
MyProtoObject mpo = null;
try {
mpo = MyProtoObject.parseFrom(protoBytes);
} catch (InvalidProtocolBufferException e1) {
e1.printStackTrace();
}
} catch (IOException e1) {
e1.printStackTrace();
}
checksum.getchecksum().getvalue()在生成和使用zip字节数组时返回1
以下是调试期间zipentry变量的值:
producer
zipEntry ZipEntry (id=44)
comment null
crc 2147247736
csize 86794
extra null
flag 2056
method 8
name "test.dat" (id=49)
size 92931
time 1214084891
consumer
zipEntry ZipEntry (id=34)
comment null
crc 2147247736
csize 86794
extra null
flag 0
method 8
name "test.dat" (id=39)
size 92931
time 1214084891
我甚至测试了另一种方法,不是在内存中处理protobytes,而是将zip文件写入磁盘,通过winzip手动解压缩,然后反序列化解压缩的二进制proto文件,它工作了!!!
我是不是拉错了,让我知道
1条答案
按热度按时间nbewdwxp1#
这里有两件不同的事情:压缩/解压和处理protobuf。这听起来像是第一个问题,听起来像是破坏了protobuf数据。所以,现在:忘掉protobuf,只专注于压缩/解压。记录原始消息是什么(在压缩之前-可能是二进制文件或base-64块)。现在在接收端,跟踪解压后在二进制中得到的内容(同样,二进制文件或base-64块)。如果它们不是100%完全相同,那么所有其他的赌注都是无效的。在成功复制原始二进制文件之前,protobuf是不可能的。
如果这是问题所在:最好显示您的邮政编码,这样我们就可以看到它了。
如果您正确地压缩/解压缩了二进制文件,那么问题就出在您的protobuf代码中。
如果这是问题所在:最好显示序列化/反序列化代码,这样我们就可以看到它。