我们正在ubuntu上使用confluent平台。我们通过curl请求将简单的json数据发送到kafka主题为“ue\u context”的kafka rest服务器。
使用以下命令为此主题创建名为“ue\u context\u stream”的Kafka流:
CREATE STREAM UE_Context_Stream (ue_key VARCHAR, ecgi VARCHAR) WITH (KAFKA_TOPIC='UE_Context', VALUE_FORMAT='JSON');
使用以下命令为此主题创建一个名为“ue\u context\u table”的Kafka表:
CREATE TABLE UE_Context_Table ( registertime BIGINT, ue_key VARCHAR, ecgi VARCHAR) WITH (KAFKA_TOPIC='UE_Context', KEY='ue_key', VALUE_FORMAT='JSON');
我使用下面的curl命令在主题上抽取了两行数据:
curl -X POST -H "Accept: application/json" -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"key": "0x1234", "value":{"ue_key": "0x1234", "ecgi" : "1234"}}]}' "http://localhost:8082/topics/UE_Context"
curl -X POST -H "Accept: application/json" -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"key": "0x1234", "value":{"ue_key": "0x4321", "ecgi" : "4321"}}]}' "http://localhost:8082/topics/UE_Context"
我有一个select查询等待表,如下所示:
当json数据被注入主题时,这个查询显示表信息。然后我们停止将json数据注入主题,结束select查询并结束select查询。如果在稍后的时间点执行选择,则不会显示先前填充的表信息。没有办法保存这些数据吗?Kafka连接器和使用数据库可能是一种选择。但是ksql没有临时内存来存储表信息吗?
1条答案
按热度按时间rmbxnbpk1#
如果稍后执行选择,则不显示先前填充的表信息。
select语句默认为主题的最新偏移量
如果要查看以前的数据,则需要将使用者偏移量设置回开头
此外,如文件中所述(重点)
select语句本身是一个非持久的连续查询。select语句的结果不会持久化到kafka主题中,而只打印在ksql控制台中。不要混淆createstreamasselect创建的持久查询和select语句的流查询结果。