我在storm中实现了一个logger bolt,元组的输入来自kafka主题。我正在使用kafka connect来监听mysql数据库的更改。
public class LoggerBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
public void execute(Tuple input, BasicOutputCollector collector) {
System.out.println(input.getValue(0));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
当在本地集群上运行时,下面的命令会被打印出来。
q�%巴克利,罗斯,巴克利buckleyr@univ.edu“963.555.6855x5018963.777.5233馆长q� “斯坦顿,凯茜·凯西斯坦顿”stantonk@univ.edu963.555.7095963.777.1015professor q�班克斯,香农香农banksbankss@univ.edu963.555.7198963.777.6979professor q�/巴恩斯,克莱奥·克莱奥巴恩斯barnesc@univ.edu“963.555.7463x7335963.777.1583美元研究教授
我想将这些细节转换为person对象,这是一个模型类?如何将元组输入解析为对象?
我试过了 input.getValues(0) , input.getFields(0)
其他方法似乎都不管用。
1条答案
按热度按时间drkbr07n1#
如果你用的是
storm-kafka-client
,它默认采用字符串。你可以通过做其他事情来选择。kafkaSpoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
. 您设置的类只需实现kafka反序列化器接口https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/deserializer.html.设置键反序列化器有一个等价的设置。