我有一个简单的Kafka经纪人和一个主题 raw_events
.
与 kafka-console-producer --topic raw_events --broker-list kafka:29092 < event.json
我正在向该主题添加成功显示的事件 kafka-console-consumer --bootstrap-server kafka:29092 --topic raw_events
. 因此,我知道这些事件落在代理中(在正确的主题中),也可以从代理中消费**)。
在这种情况下 event.json
文件包含一个非常简单的json: {'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'} }
在ksql中,主题是:
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
--------------------------------------------------------------------------------------------------
raw_events | true | 1 | 1 | 3 | 3
包含来自早期尝试的一些事件:
ksql> print 'raw_events';
Format:STRING
11/2/18 3:36:21 PM UTC , NULL , {'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'}}
11/2/18 3:43:05 PM UTC , NULL , {'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'}}
11/2/18 3:45:19 PM UTC , NULL , {'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'}}
11/2/18 3:45:43 PM UTC , NULL , {'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'}}
11/2/18 3:47:30 PM UTC , NULL , {'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'}}
(我跟在后面https://docs.confluent.io/current/ksql/docs/developer-guide/create-a-stream.html 但是用我自己的数据。)
现在,我在ksql中创建了一个成功的流:
create stream new_events (event_type varchar, project varchar) with (kafka_topic='raw_events', value_format='JSON');
创建流:
ksql> show streams;
Stream Name | Kafka Topic | Format
----------------------------------------
NEW_EVENTS | raw_events | JSON
----------------------------------------
然而(这是我的问题——可能是pebkac或ksql错误) SELECT
在那条溪流上只是停顿,没有显示任何事件。。。即使我继续向主题添加事件:
ksql> select * from new_events;
[... nothing here ...]
选择特定列,如 project
也不返回条目。
**)顺便说一句,我不清楚productcli命令为什么有参数 --broker-list
consume cli命令 --bootstrap-server
为了同样的事情。
1条答案
按热度按时间kq4fsx7k1#
按照https://www.confluent.io/blog/troubleshooting-ksql-part-1...
我在源主题中有数据
我得到了新的数据
ksql正在使用右偏移量的数据
数据与指定的 predicate 匹配*)
我在读取数据时没有反序列化错误。。。报告*)
不过你会注意到*)。。。我发现问题在于我在json中使用了单引号,而valid json正式指定(你猜到了)引号只能是双引号,
"
. 我被送上了错误的轨道,因为json的一些内部表示被导出为带单引号的json。因此,我的示例中正确的json应该是
{"event_type": "issue", "project": "sample", "user": {"name": "John Doe", "username": "jdoe"}}
一切都很好。(尽管ksql服务器的日志中没有任何信息表明这是问题的原因。幸运的是,如果其他人遇到这个问题,这里没有将此作为一个潜在的解决方案进行记录。)