我们需要从kafka主题导出生产数据以用于测试目的:数据用avro编写,模式放在模式注册表中。
我们尝试了以下策略:
使用 kafka-console-consumer
与 StringDeserializer
或者 BinaryDeserializer
. 我们无法获得一个可以用java解析的文件:解析文件时总是出现异常,表明文件的格式错误。
使用 kafka-avro-console-consumer
:它生成一个json,其中还包含一些字节,例如在反序列化bigdecimal时。我们甚至不知道该选择哪个解析选项(它不是avro,也不是json)
其他不合适的策略:
部署一个特殊的kafka使用者将需要我们打包代码并将其放置在某个生产服务器中,因为我们讨论的是我们的生产集群。只是太长了。毕竟,kafka控制台的消费者不是已经是一个具有可配置选项的消费者了吗?
可能适用的策略
使用Kafka连接接收器。我们没有找到一个简单的方法来重置消费者偏移,因为显然连接器创建的消费者仍然是活跃的,即使我们删除接收器
难道没有一种简单易行的方法可以将包含avro数据的kafka主题的值(而不是模式)的内容转储到一个文件中,以便对其进行解析吗?我希望这是可以实现的使用Kafka控制台消费者与正确的选项,加上使用正确的javaapi的avro。
2条答案
按热度按时间qoefvg9y1#
例如,使用kafka控制台消费者。。。我们无法获得一个可以用java解析的文件:解析文件时总是出现异常,表明文件的格式错误。
你不会使用普通的控制台消费者。你会用
kafka-avro-console-consumer
它将二进制avro数据反序列化为json,供您在控制台上读取。你可以重定向> topic.txt
到控制台去读。如果您确实使用了控制台使用者,则无法立即解析avro,因为您仍然需要从数据中提取架构id(第一个“魔法字节”后的4个字节),然后使用schema registry客户机检索架构,只有这样您才能反序列化消息。任何用于在控制台使用者写入时读取此文件的avro库都希望在文件头处放置一个完整的架构,而不仅仅是在每一行都指向注册表中任何内容的id(基本的avro库对注册表也一无所知)
控制台使用者唯一可配置的是格式化程序和注册表。您可以通过另外将解码器导出到类路径来添加解码器
以这样一种格式,您可以从java重新读取它?
为什么不用java编写一个kafka消费者呢?请参阅架构注册表文档
将代码打包并放入某个生产服务器中
不完全清楚为什么这是个问题。如果可以将ssh代理或vpn部署到生产网络中,那么就不需要在那里部署任何东西。
如何导出这些数据
既然您使用的是schema注册表,我建议您使用kafka connect库之一
包括hadoop、s3、elasticsearch和jdbc。我想还有一个FileLink连接器
我们没有找到一个简单的方法来重置消费者补偿
连接器名称控制是否在分布式模式下形成新的使用者组。你只需要一个消费者,所以我建议独立连接器,你可以设置
offset.storage.file.filename
属性来控制偏移的存储方式。kip-199讨论了重置connect的使用者偏移量,但这个特性并没有实现。
但是,您看到Kafka0.11如何重置偏移了吗?
备选方案包括apachenifi或streamset,两者都集成到schema注册表中,并且可以解析avro数据以将其传输到许多系统
jjhzyzn02#
与cricket\u007一样,需要考虑的一个选项是简单地将数据从一个集群复制到另一个集群。您可以使用ApacheKafkaMirrorMaker或confluent的replicator来完成此操作。两者都提供了选择要从一个集群复制到另一个集群的特定主题的选项,例如测试环境。