Kafka在Apache风暴中滔滔不绝

cyvaqqii  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(400)

我正在用Kafka喷口构建一个风暴拓扑。我正在以json格式从kafka(没有zookeeper)消费,storm应该会输出它。
如何为json数据类型定义合适的模式?目前,我有这样的代码库和基本的喷口实现:

val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()

val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()

topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

cluster.shutdown()

我是新的Apache风暴,所以会很高兴为任何建议。

fslejnso

fslejnso1#

你可以做几件事:
您可以定义 RecordTranslator . 此接口允许您定义喷口如何基于 ConsumerRecord 它读过Kafka的作品。
默认实现如下所示:

public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");

    @Override
    public List<Object> apply(ConsumerRecord<K, V> record) {
        return new Values(record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());
    }

    @Override
    public Fields getFieldsFor(String stream) {
        return FIELDS;
    }

如你所见,你将得到 ConsumerRecord ,它是内置于底层kafka客户机库中的类型,然后必须将其转换为 List<Object> 将是元组值。如果您想在发送数据之前对记录执行一些复杂的操作,您可以这样做。例如,如果您想将键、值和偏移量填充到它随后发出的数据结构中,您可以在这里这样做。你像这样使用翻译 KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build() 如果您只想将键/值反序列化到自己的一个数据类中,更好的替代方法是实现 Deserializer . 这将允许您定义如何反序列化从kafka获得的键/值。如果要反序列化(例如,将值反序列化到自己的数据类中),可以使用此接口。
默认值 StringDeserializer 这是否:

@Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

一旦你创建了自己的 Deserializer ,你用它来做 KafkaSpoutConfig.builder(bootstrapServers, "test").setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, YourDeserializer.class).build() . 有一个类似的consumer属性用于设置值反序列化器。

相关问题