在使用kafka cassandra sink connector存储到cassandra之前,有没有为我们的消息添加时间戳的功能?

2guxujil  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(334)

我正在设置日志服务器。我正在使用fluentd将日志转发给kafka,然后将它们存储在cassandra中供以后使用。为此,我使用KafkaCassandraFlume连接器。我必须按时间顺序存储数据,我需要在cassandra中为我的消息添加时间戳。如何做到这一点?
datamountaineer连接器使用kql,我认为它不支持在日志中插入时间戳。
我的连接器配置如下:

name=cassandra-sink
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=test_AF1
connect.cassandra.kcql=INSERT INTO test_event1 SELECT now() as id, message as msg FROM test_AF1 TIMESTAMP=sys_time()
connect.cassandra.port=9042
connect.cassandra.contact.points=localhost
connect.cassandra.key.space=demo
aelbi1ox

aelbi1ox1#

Kafka连接的单一信息转换可以做到这一点。举个例子:

{
  "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
  "topics": "test_AF1",
…
  "transforms": "addTS",
  "transforms.addTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addTS.timestamp.field": "op_ts"
}'

这将向消息负载添加一个名为 op_ts Kafka信息的时间戳。
我不知道它是如何与kql交互的;你可能想看看我知道的另外两个cassandraFlume:
https://www.confluent.io/hub/confluentinc/kafka-connect-cassandra
https://www.confluent.io/hub/datastax/kafka-connect-dse

相关问题