我正在尝试使用kstream to ktable join来丰富Kafka的主题。对于我的概念证明,我目前有一个kafka流,其中包含大约600000条记录,所有记录都具有相同的密钥,并且从一个主题创建了一个ktable,其中ktable主题中的密钥与创建kstream的主题中600000条记录的密钥相匹配。
当我使用左连接(通过下面的代码)时,valuejoiner上的所有记录都返回null。
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe-json-parse-" + System.currentTimeMillis());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xx.xx.xxx:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.WallclockTimestampExtractor");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
final StreamsBuilder builder = new StreamsBuilder();
// Build a Kafka Stream from the Netcool Input Topic
KStream<String, String> source = builder.stream("output-100k");
// Join the KStream to the KTable
KStream<String, String> enriched_output = source
.leftJoin(netcool_enrichment, (orig_msg, description) -> {
String new_msg = jsonEnricher(orig_msg, description);
if (description != null) {
System.out.println("\n[DEBUG] Enriched Input Orig: " + orig_msg);
System.out.println("[DEBUG] Enriched Input Desc: " + description);
System.out.println("[DEBUG] Enriched Output: " + new_msg);
}
return new_msg;
});
以下是来自源kstream的示例输出记录(使用foreach循环):
[KSTREAM] Key: ismlogs
[KSTREAM] Value: {"severity":"debug","ingested_timestamp":"2018-07-18T19:32:47.227Z","@timestamp":"2018-06-28T23:36:31.000Z","offset":482,"@metadata":{"beat":"filebeat","topic":"input-100k","type":"doc","version":"6.2.2"},"beat":{"hostname":"abc.dec.com","name":"abc.dec.com","version":"6.2.2"},"source":"/root/100k-raw.txt","message":"Thu Jun 28 23:36:31 2018 Debug: Checking status of file /ism/profiles/active/test.xml","key":"ismlogs","tags":["ismlogs"]}
我尝试将ktable转换回kstream,并在转换后的流上使用foreach循环,并验证记录是否确实存在于ktable中。
KTable<String, String> enrichment = builder.table("enrichment");
KStream<String, String> ktable_debug = enrichment.toStream();
ktable_debug.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println("[KTABLE] Key: " + key);
System.out.println("[KTABLE] Value: " + value);
}
});
上述代码输出:
[KTABLE] Key: "ismlogs"
[KTABLE] Value: "ISM Logs"
1条答案
按热度按时间kx7yvsdv1#
根据您的控制台消息,键是不同的,因此它们不会加入:
在这种情况下
KTable
,关键是"ismlogs"
用双引号。