clickhouse不通过复杂的物化视图使用kafka消息

lnlaulya  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(955)

tldr摘要:clickhouse-kafka引擎,物化视图不适用于复杂的select语句。
较长版本:
我试图通过使用jsoneachrow的kafka引擎将大量json数据点发送到clickhouse。但是物化视图不会正确地使用流。我有一个用go编写的kafka生产者,它从多个tcp流中获取数据并异步写入kafka队列。
因此,数据流:
tcp源->生产者->Kafka->clickhouse(Kafka引擎)->物化视图->目的表
所有这些工作,到目前为止还不错。
当我提高输入数据的速度(400000点/秒)时,我第一次遇到了瓶颈。我的制作人无法足够快地向Kafka写入数据,连接也堆积如山。因此,我希望尝试批处理数据,但clickhouse似乎无法将json数组作为输入(https://clickhouse.yandex/docs/en/interfaces/formats/)
因此,我想到了在数据点的源代码处批处理数据点,并在物化视图中转换消息的想法,因此在此之前,我有很多单独的消息:
{“t”:1547457441651445401,“i”:“设备2”,“c”:20001,“v”:56454654}
我现在有一个消息,它是上面的倍数,并且经过了字符串化,在点之间有新行分隔符。
{“realtimes”:“{t\”:1547458266855015791,\“i\”:\“device\u 2\”,\“c\”:20001,\“v\”:56454654}\n{“t\”:1547458266855015791,\“i\”:\“device\u 2\”,\“c\”:20001,\“v\”:56454654}
这里的目的是在物化视图的select语句中使用visitparamextract将字符串解析并转换为多个值。
物化视图:

CREATE MATERIALIZED VIEW ltdb_mat_view TO default.ltdb AS SELECT 
    visitParamExtractInt(x, 't') AS timestamp, 
    visitParamExtractString(x, 'i') AS device_id, 
    visitParamExtractInt(x, 'v') AS value FROM  (
    SELECT arrayJoin(*) AS x
    FROM 
    (
        SELECT splitByChar('\n', realtimes)
        FROM kafka_stream_realtimes 
    )  )

它似乎在做些什么,因为当它运行kafka\u流时,realtimes被清除,我无法手动查询它,得到一个错误“db::exception:failed to claim consumer:”,但数据从未到达最终表。
总结:
数据到达clickhouse,它只是消失,似乎永远不会到达最终的表。
如果我删除物化视图,我可以看到数据在kafka\u stream\u realtimes中累积
如果我以insert into语句和select语句的形式运行物化视图查询,它将从流中获取数据到最终的表。
我意识到我可能只是把瓶颈推到了clickhouse上,这可能永远不会奏效,但为了完整起见,我想通过这个过程
完整性:kafka\u stream\u realimes:

CREATE TABLE IF NOT EXISTS kafka_stream_realtimes(realtimes String)
  ENGINE = Kafka('kafka:9092', 'realtimes', 'groupTest', 'JSONEachRow');

长期数据库:

CREATE TABLE default.ltdb (timestamp Int64,device_id String,value Int64) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(toDateTime(round(timestamp/1000000000)))
ORDER BY (device_id, value)
SETTINGS index_granularity=8192;
cgfeq70w

cgfeq70w1#

但是clickhouse似乎不能将json数组作为输入
似乎动机是在生产者方面进行批量提交。为什么不把一堆json行分组并一次性提交呢?clickhouse将接收这些多行消息并为您解析它们。您可能还需要提供 kafka_row_delimiter 设置为kafka引擎,因为大多数kafka生产者不会在每条消息的末尾附加行分隔符。
所以一个信息变成

{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445402,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445403,"i": "device_2","c": 20001,"v": 56454654}
...

相关问题