我有一个Kafka主题,有大约300万张唱片。我想从中选出一条有特定参数的记录。我一直试图用镜头来查询这个问题,但无法形成正确的查询。以下是1条消息的记录内容。
{
"header": {
"schemaVersionNo": "1",
},
"payload": {
"modifiedDate": 1552334325212,
"createdDate": 1552334325212,
"createdBy": "A",
"successful": true,
"source_order_id": "1111111111111",
}
}
现在我想筛选出一个具有特定源订单id的记录,但无法找到正确的方法。我们尝试过通过镜头以及Kafka工具。
下面是我们在镜头中尝试的一个示例查询:
SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.createdBy='A'
此查询有效,但是如果尝试使用如下所示的源id,则会出现错误:
SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.source_order_id='1111111111111'
Error : "Invalid syntax at line=3 and column=41.Invalid syntax for 'payload.source_order_id'. Field 'payload' resolves to primitive type STRING.
通过一个定制消费者消费所有300万条记录,然后对其进行迭代,对我来说似乎不是一种优化的方法,因此,为这样的用例寻找任何可用的解决方案。
1条答案
按热度按时间de90aj5v1#
既然您说过您对其他解决方案持开放态度,那么下面是一个使用ksql构建的解决方案。
首先,让我们将一些示例记录放到源主题中:
使用ksql,我们可以用
PRINT
:然后在主题上声明一个模式,这使我们能够对其运行sql:
告诉ksql处理主题中的所有数据:
现在我们可以选择所有数据:
或者我们可以使用
->
访问架构中嵌套字段的符号:除了选择所有记录外,还可以仅返回感兴趣的字段:
使用ksql,您可以编写任何
SELECT
语句,该语句使用所有现有消息以及根据声明的SELECT
声明:Kafka集群的列表主题:
打印新主题的内容: