如何使用ksqldb查询中的protobuf行反序列化?

vsnjm48y  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(419)

我正试图用java标准化ksqldb应用程序中的反序列化,但我很难理解如何处理这个问题 Row ksqldb返回的类型 Client 类型。ex(尝试/删除):

import io.confluent.ksql.api.client.Client;
    import io.confluent.ksql.api.client.BatchedQueryResult;

    Client ksqldbClient = kafkaService.getKSQLDBClient();
    String queryString = String.format("SELECT * FROM %s WHERE %s = '%s';", tableName, primaryKeyName, id);
    BatchedQueryResult query = ksqldbClient.executeQuery(queryString);
    List<Row> rows = query.get();

我的ksqldb表配置为使用protobuf序列化,但是看起来 Row 类型是json?我只能通过以下方式获取数据:

for (Row row : rows) {
        String json = row.asObject().toJsonString();
        // Deserialize json string
        ...
    }

ksqldb客户机只是自己处理protobuf反序列化吗?有没有办法只获取protobuf字节,这样我就可以将它传递到我已经定义的protobuf反序列化程序中,这样我就不需要再编写json反序列化程序了?

rks48beu

rks48beu1#

row.asObject() 返回已反序列化的ksqlobject,其操作类似于jdbc resultset,因为您可以为行中的类型对其调用各种get方法。
如果您想Map到从protobuf生成的特定域对象,似乎没有直接的方法,如果您需要该特性,最好直接使用kafka流而不是ksql

相关问题