我正在从一个Kafka主题获取json数据。我需要将这些数据转储到gcs(googlecloudstorage)的一个目录中,其中目录名将从json数据中的“id”值获取。我搜索了一下,没有发现任何类似的用例,其中kafka connect可以用来解释json数据,并根据json数据的值动态创建目录。这可以通过Kafka连接来实现吗?
0s0u357o1#
这是不可能真正的开箱即用的大多数连接器。相反,您可以实现自己的kafka connect sink任务,该任务处理kafka记录,然后基于json将它们写入正确的gcs目录。下面是您在连接器中重写的方法。以下是aws s3接收器连接器的源代码链接。
cxfofazt2#
您可以使用confluent提供的kafka connect gcs接收器连接器。google云存储(gcs)连接器目前作为一个接收器提供,允许您以各种格式将数据从kafka主题导出到gcs对象。此外,对于某些数据布局,gcs连接器通过保证向其生成的gcs对象的使用者精确地传递一次语义来导出数据。下面是连接器的配置示例:
name=gcs-sink connector.class=io.confluent.connect.gcs.GcsSinkConnector tasks.max=1 topics=gcs_topic gcs.bucket.name=#bucket-name gcs.part.size=5242880 flush.size=3 gcs.credentials.path=#/path/to/credentials/keys.json storage.class=io.confluent.connect.gcs.storage.GcsStorage format.class=io.confluent.connect.gcs.format.avro.AvroFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner schema.compatibility=BACKWARD confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # Uncomment and insert license for production use # confluent.license=
您可以在我上面提供的链接中找到有关安装和配置的更多详细信息。
2条答案
按热度按时间0s0u357o1#
这是不可能真正的开箱即用的大多数连接器。相反,您可以实现自己的kafka connect sink任务,该任务处理kafka记录,然后基于json将它们写入正确的gcs目录。
下面是您在连接器中重写的方法。
以下是aws s3接收器连接器的源代码链接。
cxfofazt2#
您可以使用confluent提供的kafka connect gcs接收器连接器。
google云存储(gcs)连接器目前作为一个接收器提供,允许您以各种格式将数据从kafka主题导出到gcs对象。此外,对于某些数据布局,gcs连接器通过保证向其生成的gcs对象的使用者精确地传递一次语义来导出数据。
下面是连接器的配置示例:
您可以在我上面提供的链接中找到有关安装和配置的更多详细信息。