无法反序列化压缩的协议缓冲区

k4ymrczo  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(475)

我让Kafka集群接收消息。消息是zip文件的字节数组。zip文件包含二进制protobuf数据文件作为条目。我正在读取zip文件,并试图反序列化protobuf条目,这就是我的代码所处的位置 "protocol message has invalid UTF-8,invalid tag" 例外情况。
在将二进制protobuf文件作为压缩字节数组发送到代理之前,我能够对其进行反序列化。
但是当我压缩这些二进制protobuf文件,生成消息给kafka,使用它,然后尝试反序列化zip流中的条目时,我面临着一些问题。
我不确定谁是罪魁祸首。
既然这些二进制协议缓冲区是gzip压缩的,那么再次压缩它们会把事情搞砸吗?
有人能给我点启示吗。
谢谢

编辑

Producer Side:

public byte[] getZipfileBytes() {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ZipOutputStream zipOut = new ZipOutputStream(baos);
        CheckedOutputStream checkSum = new CheckedOutputStream(zipOut, new Adler32());

        try {
            ZipEntry zipEntry = new ZipEntry(testFile.getName());
            byte[] protoBytes = IOUtils.toByteArray(new FileInputStream(testFile));
            System.out.println("bytes length:\t"+protoBytes.length);
            zipEntry.setSize(protoBytes.length);
            zipOut.putNextEntry(zipEntry);
            zipOut.write(protoBytes);
            zipOut.close();
            System.out.println("checksum:"+checkSum.getChecksum().getValue());
            zipBytes = baos.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return zipBytes;
    }

    Consumer Side:
         processConsumerRecord(ConsumerRecord<String, byte[]> record) {
                String key = record.key();
                byte[] dataPacket = record.value();

                ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(dataPacket));

                CheckedInputStream checkSum = new CheckedInputStream(zipIn,
                        new Adler32());
                ZipEntry zipEntry;
                try {
                    zipEntry = zipIn.getNextEntry();
                    while (zipEntry != null) {
                        String name = zipEntry.getName();
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                            try {
                                IOUtils.copy(zipIn, baos);
                                byte[] protoBytes = baos.toByteArray();

二进制protobuf字节是gzip的,所以我需要gunzip
如果我做gunzip,它抛出的不是gzip格式。
如果我跳过gunzip并执行parsefrom,就会得到无效的标记异常。

GZIPInputStream gzip = new GZIPInputStream(
                        new ByteArrayInputStream(baos.toByteArray()));
                        MyProtoObject mpo = null;
                        try {
                            mpo = MyProtoObject.parseFrom(protoBytes);
                        } catch (InvalidProtocolBufferException e1) {
                            e1.printStackTrace();
                        }
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }

checksum.getchecksum().getvalue()在生成和使用zip字节数组时返回1
以下是调试期间zipentry变量的值:

producer 
        zipEntry    ZipEntry  (id=44)   
        comment null    
        crc 2147247736  
        csize   86794   
        extra   null    
        flag    2056    
        method  8   
        name    "test.dat" (id=49)  
        size    92931   
        time    1214084891  

    consumer
        zipEntry    ZipEntry  (id=34)   
        comment null    
        crc 2147247736  
        csize   86794   
        extra   null    
        flag    0   
        method  8   
        name    "test.dat" (id=39)  
        size    92931   
        time    1214084891

我甚至测试了另一种方法,不是在内存中处理protobytes,而是将zip文件写入磁盘,通过winzip手动解压缩,然后反序列化解压缩的二进制proto文件,它工作了!!!
我是不是拉错了,让我知道

nbewdwxp

nbewdwxp1#

这里有两件不同的事情:压缩/解压和处理protobuf。这听起来像是第一个问题,听起来像是破坏了protobuf数据。所以,现在:忘掉protobuf,只专注于压缩/解压。记录原始消息是什么(在压缩之前-可能是二进制文件或base-64块)。现在在接收端,跟踪解压后在二进制中得到的内容(同样,二进制文件或base-64块)。如果它们不是100%完全相同,那么所有其他的赌注都是无效的。在成功复制原始二进制文件之前,protobuf是不可能的。
如果这是问题所在:最好显示您的邮政编码,这样我们就可以看到它了。
如果您正确地压缩/解压缩了二进制文件,那么问题就出在您的protobuf代码中。
如果这是问题所在:最好显示序列化/反序列化代码,这样我们就可以看到它。

相关问题