我正在用Kafka喷口构建一个风暴拓扑。我正在以json格式从kafka(没有zookeeper)消费,storm应该会输出它。
如何为json数据类型定义合适的模式?目前,我有这样的代码库和基本的喷口实现:
val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()
val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())
cluster.shutdown()
我是新的Apache风暴,所以会很高兴为任何建议。
1条答案
按热度按时间fslejnso1#
你可以做几件事:
您可以定义
RecordTranslator
. 此接口允许您定义喷口如何基于ConsumerRecord
它读过Kafka的作品。默认实现如下所示:
如你所见,你将得到
ConsumerRecord
,它是内置于底层kafka客户机库中的类型,然后必须将其转换为List<Object>
将是元组值。如果您想在发送数据之前对记录执行一些复杂的操作,您可以这样做。例如,如果您想将键、值和偏移量填充到它随后发出的数据结构中,您可以在这里这样做。你像这样使用翻译KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()
如果您只想将键/值反序列化到自己的一个数据类中,更好的替代方法是实现Deserializer
. 这将允许您定义如何反序列化从kafka获得的键/值。如果要反序列化(例如,将值反序列化到自己的数据类中),可以使用此接口。默认值
StringDeserializer
这是否:一旦你创建了自己的
Deserializer
,你用它来做KafkaSpoutConfig.builder(bootstrapServers, "test").setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, YourDeserializer.class).build()
. 有一个类似的consumer属性用于设置值反序列化器。