ksql-insert到流中不会产生任何数据

syqv5f0l  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(359)

从正在使用 INSERT INTO ksql操作。
我遵循的步骤是:
我有一条小溪 event_stream 我根据Kafka的主题创作的。

CREATE STREAM event_stream (eventType varchar, eventTime varchar, 
sourceHostName varchar) WITH (kafka_topic='events', value_format='json');
``` `SELECT * FROM event_stream;` 显示正确传入的消息。
我想把这些信息发到Kafka的另一个主题上, `output_events` ,我已经创建了。
然后在ksql中创建第二个流:

CREATE STREAM output_stream (eventTime varchar, extraColumn varchar,
sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');

最后,我将输入链接到输出,如下所示:

INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn,
sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';

以上所有内容似乎都没有错误地完成,但是如果我运行 `SELECT * FROM output_stream;` 我没有数据。为什么会这样?
运行上述查询的select部分可以很好地工作,因此我可以看到主题上出现了匹配的结果。
奇怪的是,如果我跑 `DESCRIBE EXTENDED output_stream` 消息计数表示消息正在到达流:

Local runtime statistics

messages-per-sec: 0.33 total-messages: 86 last-message: 11/9/18 1:15:43 PM UTC
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic output_events)

我也检查了ksql服务器日志,但是没有发现任何错误。
kg7wmglp

kg7wmglp1#

这是一个错误,通过无意中误用 CREATE STREAM 用错误的语法。您正在使用该变量针对现有主题“注册”ksql流。为了 INSERT INTO 为了工作,它需要一个 CREATE STREAM target AS SELECT (“CSA”)。
让我们把它做完。这里我用这个docker编写一个测试设置。
填充一些虚拟数据:

docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t events -P <<EOF
{"eventType":"1", "eventTime" :"2018-11-13-06:34:57", "sourceHostName":"asgard"}
{"eventType":"2", "eventTime" :"2018-11-13-06:35:57", "sourceHostName":"asgard"}
{"eventType":"MatchingValue", "eventTime" :"2018-11-13-06:35:58", "sourceHostName":"asgard"}
EOF

使用ksql注册源主题:

CREATE STREAM event_stream (eventType varchar, eventTime varchar, sourceHostName varchar) WITH (kafka_topic='events', value_format='json');

查询流:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT * FROM event_stream;
1542091084660 | null | 1 | 2018-11-13-06:34:57 | asgard
1542091084660 | null | 2 | 2018-11-13-06:35:57 | asgard
1542091785207 | null | MatchingValue | 2018-11-13-06:35:58 | asgard

所以看看 CREATE STREAM 你引用的:

CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');

我猜如果你跑 LIST TOPICS; 你会看到这个主题已经存在于你的Kafka经纪人?

ksql> LIST TOPICS;

Kafka Topic            | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------------
_confluent-metrics     | false      | 12         | 1                  | 0         | 0
_schemas               | false      | 1          | 1                  | 0         | 0
docker-connect-configs | false      | 1          | 1                  | 0         | 0
docker-connect-offsets | false      | 25         | 1                  | 0         | 0
docker-connect-status  | false      | 5          | 1                  | 0         | 0
events                 | true       | 1          | 1                  | 0         | 0
output_events          | false      | 4          | 1                  | 0         | 0
----------------------------------------------------------------------------------------------------
ksql>

因为如果不是这样的话 CREATE STREAM 将失败:

ksql> CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
Kafka topic does not exist: output_events
ksql>

因此,假设我也在我的测试集群上创建了这个主题:

$ docker-compose exec kafka bash -c "kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 4 --topic output_events"

然后创建流:

ksql> CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');

Message
----------------
Stream created
----------------

注意上面写着 Stream created ,而不是 Stream created and running 现在让我们运行 INSERT INTO :

ksql> INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';

Message
-------------------------------
Insert Into query is running.
-------------------------------

这个 DESCRIBE EXTENDED 正如您所看到的,输出确实显示了正在处理的消息:

ksql> DESCRIBE EXTENDED output_stream;

Name                 : OUTPUT_STREAM
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : output_events (partitions: 4, replication: 1)

Field          | Type
--------------------------------------------
ROWTIME        | BIGINT           (system)
ROWKEY         | VARCHAR(STRING)  (system)
EVENTTIME      | VARCHAR(STRING)
EXTRACOLUMN    | VARCHAR(STRING)
SOURCEHOSTNAME | VARCHAR(STRING)
--------------------------------------------

Queries that write into this STREAM
-----------------------------------
InsertQuery_0 : INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:      0.01   total-messages:         1     last-message: 11/13/18 6:49:46 AM UTC
failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
(Statistics of the local KSQL server interaction with the Kafka topic output_events)

但主题本身没有任何信息:

ksql> print 'output_events' from beginning;
^C

也不是ksql流:

ksql> SELECT * FROM OUTPUT_STREAM;
^CQuery terminated

所以 INSERT INTO 命令设计为针对现有csas/ctas目标流运行,而不是针对现有主题注册的源流。
让我们换成那样试试。首先,我们需要删除现有的流定义,为此还需要终止 INSERT INTO 查询:

ksql> DROP STREAM OUTPUT_STREAM;
Cannot drop OUTPUT_STREAM.
The following queries read from this source: [].
The following queries write into this source: [InsertQuery_0].
You need to terminate them before dropping OUTPUT_STREAM.
ksql> TERMINATE InsertQuery_0;

Message
-------------------
Query terminated.
-------------------
ksql> DROP STREAM OUTPUT_STREAM;

Message
------------------------------------
Source OUTPUT_STREAM was dropped.
------------------------------------

现在创建目标流:

ksql> CREATE STREAM output_stream WITH (kafka_topic='output_events') AS SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';

Message
----------------------------
Stream created and running
----------------------------

注意,在创建流时 running (与之前相比) created ). 现在查询流:

ksql> SELECT * FROM OUTPUT_STREAM;
1542091785207 | null | 2018-11-13-06:35:58 | Extra info | asgard

并检查基本主题:

ksql> PRINT 'output_events' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542091785207,"ROWKEY":"null","EVENTTIME":"2018-11-13-06:35:58","EXTRACOLUMN":"Extra info","SOURCEHOSTNAME":"asgard"}

因此,您遇到了ksql中的一个bug(在这里提出),但是幸运的是,通过使用更简单的ksql语法,结合您的 CREATE STREAM 以及 INSERT INTO 查询成一个。

相关问题