从正在使用 INSERT INTO
ksql操作。
我遵循的步骤是:
我有一条小溪 event_stream
我根据Kafka的主题创作的。
CREATE STREAM event_stream (eventType varchar, eventTime varchar,
sourceHostName varchar) WITH (kafka_topic='events', value_format='json');
``` `SELECT * FROM event_stream;` 显示正确传入的消息。
我想把这些信息发到Kafka的另一个主题上, `output_events` ,我已经创建了。
然后在ksql中创建第二个流:
CREATE STREAM output_stream (eventTime varchar, extraColumn varchar,
sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
最后,我将输入链接到输出,如下所示:
INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn,
sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
以上所有内容似乎都没有错误地完成,但是如果我运行 `SELECT * FROM output_stream;` 我没有数据。为什么会这样?
运行上述查询的select部分可以很好地工作,因此我可以看到主题上出现了匹配的结果。
奇怪的是,如果我跑 `DESCRIBE EXTENDED output_stream` 消息计数表示消息正在到达流:
Local runtime statistics
messages-per-sec: 0.33 total-messages: 86 last-message: 11/9/18 1:15:43 PM UTC
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic output_events)
我也检查了ksql服务器日志,但是没有发现任何错误。
1条答案
按热度按时间kg7wmglp1#
这是一个错误,通过无意中误用
CREATE STREAM
用错误的语法。您正在使用该变量针对现有主题“注册”ksql流。为了INSERT INTO
为了工作,它需要一个CREATE STREAM target AS SELECT
(“CSA”)。让我们把它做完。这里我用这个docker编写一个测试设置。
填充一些虚拟数据:
使用ksql注册源主题:
查询流:
所以看看
CREATE STREAM
你引用的:我猜如果你跑
LIST TOPICS;
你会看到这个主题已经存在于你的Kafka经纪人?因为如果不是这样的话
CREATE STREAM
将失败:因此,假设我也在我的测试集群上创建了这个主题:
然后创建流:
注意上面写着
Stream created
,而不是Stream created and running
现在让我们运行INSERT INTO
:这个
DESCRIBE EXTENDED
正如您所看到的,输出确实显示了正在处理的消息:但主题本身没有任何信息:
也不是ksql流:
所以
INSERT INTO
命令设计为针对现有csas/ctas目标流运行,而不是针对现有主题注册的源流。让我们换成那样试试。首先,我们需要删除现有的流定义,为此还需要终止
INSERT INTO
查询:现在创建目标流:
注意,在创建流时
running
(与之前相比)created
). 现在查询流:并检查基本主题:
因此,您遇到了ksql中的一个bug(在这里提出),但是幸运的是,通过使用更简单的ksql语法,结合您的
CREATE STREAM
以及INSERT INTO
查询成一个。