我正在尝试使用函数“mapvalues”将原始流“textlines”中获得的字符串值转换为jsonobject消息。然后将newstream中的所有内容流到一个名为“testoutput”的主题上。但是每次消息实际通过转换块时,我都会得到一个nullpointerexception,错误只指向kafka流库。不知道发生了什么:((
p、 当我从原始流派生/创建一个新的Kafka流时,新的流是否属于原始生成器?因为我需要生成器来创建kafkastreams对象并开始流式处理,所以我不确定是否需要对新的流进行其他处理,而不仅仅是指定它的去向(“topic”)
//Testing a Kafka Stream Application
public class testStream {
public static void main(String[] args) throws Exception {
//Configurations
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-teststream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxx:xxxx");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//Building Stream
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("mytest2");
//Printout The Inputs just for testing purposes
textlines.foreach(new ForeachAction<String, String>(){
public void apply(String key, String value){
for(int y=0; y<value.length(); y++){
System.out.print(value.charAt(y));
}
System.out.print("\n");
}
});
//Transform String Records to JSON Objects
KStream<String, JSONObject> newStream = textlines.mapValues(new ValueMapper<String,JSONObject>(){
@Override
public JSONObject apply(String value) {
JSONObject jsnobj = new JSONObject();
//If the first 4 letters of the message is "xxxx" then parse it to a
//JSON Object, otherwise create a dummy
if(value.substring(0, 4).equals("xxxx")){
jsnobj.put("Header_Title", value.substring(0, 4));
jsnobj.put("Data_Part", value.substring(4));
}else{
jsnobj.put("Header_Title", "Not xxxx");
jsnobj.put("Data_Part", "None");
}
return jsnobj;
}
});
//Specify target
newStream.to("testoutput");
//Off you go
KafkaStreams streams=new KafkaStreams(builder, props);
streams.start();
}
}
2条答案
按热度按时间u4dcyp6a1#
@迈克尔:我根据你的建议修改了密码。谢谢。我的目标是将读取字符串解析为json值。
ia2d9nvy2#
据我所知,你的问题是:
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Sth like this
newStream.to(Serdes.String(), myJsonSerde, "testoutput");