基本上,当我使用 group by
我的查询中的表达式。
从主题创建流
CREATE STREAM events_stream \
( \
account VARCHAR, \
event_id VARCHAR, \
user_name VARCHAR, \
event_name VARCHAR, \
source VARCHAR, \
message VARCHAR, \
timestamp STRUCT<iMillis INTEGER>) \
WITH (KAFKA_TOPIC='console_failure', VALUE_FORMAT='JSON');
从上面的流创建表。
ksql> CREATE TABLE events_table AS \
SELECT source, count(*) \
FROM events_stream \
WINDOW TUMBLING (SIZE 60 SECONDS) \
WHERE account = '1111111111' \
GROUP BY source \
HAVING count(*) > 3;
生成此消息4次。
ip="10.10.10.10"
data = {
"account": "1111111111",
"event_id": "4cdabe46-690d-494a-a37e-6e455781d8b4",
"user_name": "shakeel",
"event_name": "some_event",
"source": "127.0.0.1",
"message": "message related to event",
"timestamp": {
"iMillis": 1547543309000
}
}
producer.send('console_failure', key='event_json', value=dict(data)
这和预期的一样!但是如何获得匹配结果的其他字段(例如:用户名、消息等)?
ksql> select * from events_table;
1550495772262 | 10.10.10.10 : Window{start=1550495760000 end=-} | 10.10.10.10 | 4
ksql>
使用后我明白可能是我们无法得到其他列时使用 group by
声明。
ksql> CREATE TABLE events_table1 AS \
> SELECT source, event_id, \
> count(*) \
> FROM events_stream \
> WINDOW TUMBLING (SIZE 60 SECONDS) \
> WHERE account = '1111111111' \
> GROUP BY source \
> HAVING count(*) > 3;
Group by elements should match the SELECT expressions.
ksql>
我们可以通过对流重新设置密钥来实现这一点吗?
读完这篇文章后,我试着用 event_id
字段,但不确定如何在 group by
声明。
下面是我尝试使用rekey时得到的错误。
ksql> CREATE STREAM events_stream_rekey AS SELECT * FROM events_stream PARTITION BY event_id;
Message
----------------------------
Stream created and running
----------------------------
ksql>
ksql> SELECT ROWKEY, EVENT_ID FROM events_stream_rekey;
4cdabe46-690d-494a-a37e-6e455781d8b4 | 4cdabe46-690d-494a-a37e-6e455781d8b4
ksql>
ksql> CREATE TABLE events_table2 AS \
> SELECT source, \
> count(*), \
> WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'),
> WINDOW TUMBLING (SIZE 60 SECONDS) \
> WHERE account = '1111111111' \
> GROUP BY source \
> HAVING count(*) > 3;
line 1:70: extraneous input 'WITH' expecting {'(', 'NOT', 'NO', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'CAST', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', '+', '-', '*', STRING, BINARY_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
ksql版本details:cli v5.1.0,服务器v5.1.0
4条答案
按热度按时间gv8xihay1#
除了罗宾的回答,这个错误:
是指你的with子句放错了地方。正确的模式是:
你会说:
xa9qqrwz2#
------复制步骤
qc6wkl3g3#
制作人script:this script 将在30秒内生成4条消息。
使用来自测试主题的消息时(使用普通使用者脚本)。
预期结果:如果消息包含相同的
source
在窗口时间的30秒内为相同的account
然后我想立即得到下一条完整的消息(在我的例子中是第四条消息,如下所示)。这可以用ksql实现吗?bfrts1fy4#
消息本身实际上告诉您问题所在:)
group by元素应与select表达式匹配。
所以在这里,你有
source
两者都有SELECT
以及GROUP BY
:要添加其他列,请确保将它们添加到
SELECT
也:编辑以回答更新的问题:
我不认为用sql(或ksql)可以[轻松地]做到这一点。您可以通过在聚合操作中包含时间戳来实现类似的功能,例如:
然后获取结果表并连接到事件流:
我没试过,但原则上,它可能会让你达到你想要的目的。