如何使用Kafka连接到输出到动态目录gcs?

xghobddn  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(372)

我正在从一个Kafka主题获取json数据。我需要将这些数据转储到gcs(googlecloudstorage)的一个目录中,其中目录名将从json数据中的“id”值获取。
我搜索了一下,没有发现任何类似的用例,其中kafka connect可以用来解释json数据,并根据json数据的值动态创建目录。这可以通过Kafka连接来实现吗?

0s0u357o

0s0u357o1#

这是不可能真正的开箱即用的大多数连接器。相反,您可以实现自己的kafka connect sink任务,该任务处理kafka记录,然后基于json将它们写入正确的gcs目录。
下面是您在连接器中重写的方法。
以下是aws s3接收器连接器的源代码链接。

cxfofazt

cxfofazt2#

您可以使用confluent提供的kafka connect gcs接收器连接器。
google云存储(gcs)连接器目前作为一个接收器提供,允许您以各种格式将数据从kafka主题导出到gcs对象。此外,对于某些数据布局,gcs连接器通过保证向其生成的gcs对象的使用者精确地传递一次语义来导出数据。
下面是连接器的配置示例:

name=gcs-sink
connector.class=io.confluent.connect.gcs.GcsSinkConnector
tasks.max=1
topics=gcs_topic

gcs.bucket.name=#bucket-name
gcs.part.size=5242880
flush.size=3

gcs.credentials.path=#/path/to/credentials/keys.json

storage.class=io.confluent.connect.gcs.storage.GcsStorage
format.class=io.confluent.connect.gcs.format.avro.AvroFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

schema.compatibility=BACKWARD

confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

# Uncomment and insert license for production use

# confluent.license=

您可以在我上面提供的链接中找到有关安装和配置的更多详细信息。

相关问题