我正在设置日志服务器。我正在使用fluentd将日志转发给kafka,然后将它们存储在cassandra中供以后使用。为此,我使用KafkaCassandraFlume连接器。我必须按时间顺序存储数据,我需要在cassandra中为我的消息添加时间戳。如何做到这一点?
datamountaineer连接器使用kql,我认为它不支持在日志中插入时间戳。
我的连接器配置如下:
name=cassandra-sink
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=test_AF1
connect.cassandra.kcql=INSERT INTO test_event1 SELECT now() as id, message as msg FROM test_AF1 TIMESTAMP=sys_time()
connect.cassandra.port=9042
connect.cassandra.contact.points=localhost
connect.cassandra.key.space=demo
1条答案
按热度按时间aelbi1ox1#
Kafka连接的单一信息转换可以做到这一点。举个例子:
这将向消息负载添加一个名为
op_ts
Kafka信息的时间戳。我不知道它是如何与kql交互的;你可能想看看我知道的另外两个cassandraFlume:
https://www.confluent.io/hub/confluentinc/kafka-connect-cassandra
https://www.confluent.io/hub/datastax/kafka-connect-dse