我正试图用java标准化ksqldb应用程序中的反序列化,但我很难理解如何处理这个问题 Row
ksqldb返回的类型 Client
类型。ex(尝试/删除):
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.BatchedQueryResult;
Client ksqldbClient = kafkaService.getKSQLDBClient();
String queryString = String.format("SELECT * FROM %s WHERE %s = '%s';", tableName, primaryKeyName, id);
BatchedQueryResult query = ksqldbClient.executeQuery(queryString);
List<Row> rows = query.get();
我的ksqldb表配置为使用protobuf序列化,但是看起来 Row
类型是json?我只能通过以下方式获取数据:
for (Row row : rows) {
String json = row.asObject().toJsonString();
// Deserialize json string
...
}
ksqldb客户机只是自己处理protobuf反序列化吗?有没有办法只获取protobuf字节,这样我就可以将它传递到我已经定义的protobuf反序列化程序中,这样我就不需要再编写json反序列化程序了?
1条答案
按热度按时间rks48beu1#
row.asObject()
返回已反序列化的ksqlobject,其操作类似于jdbc resultset,因为您可以为行中的类型对其调用各种get方法。如果您想Map到从protobuf生成的特定域对象,似乎没有直接的方法,如果您需要该特性,最好直接使用kafka流而不是ksql