我正在尝试加入两个Kafka流dsl KTable
使用:
KTable<String, String> source = builder.table("stream-source");
KTable<String, String> target = builder.table("stream-target");
source.join(target, new ValueJoiner<String, String, String>() {
public String apply(String value1, String value2) {
return value1 + ":" + value2;
}
});
我已经确保键和值都不是 null
:
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < PERSONS_SOURCE.length; i++) {
producer.send(new ProducerRecord<String, String>("stream-source", Long.toString(i + 1L), PERSONS_SOURCE[i]));
}
for(int i = 0; i < PERSONS_TARGET.length; i++) {
producer.send(new ProducerRecord<String, String>("stream-target", Long.toString(i + 1L), PERSONS_TARGET[i]));
}
producer.close();
但是应用程序报告rocksdb层中有一个关于分区的空指针。
[2016-07-17 21:58:04,682]error user provided listener org.apache.kafka.streams.processor.internals.streamthread$1 for group streams-persons2 failed on partition assignment(org.apache.kafka.clients.consumer.internals.consumercoordinator)java.lang.nullpointerexception at org.rocksdb.rocksdb.put(rocksdb)。java:432)在org.apache.kafka.streams.state.internals.rocksdbstore.putinternal(rocksdbstore。java:299)访问org.apache.kafka.streams.state.internals.rocksdbstore.access$200(rocksdbstore。java:62)在org.apache.kafka.streams.state.internals.rocksdbstore$3.restore(rocksdbstore。java:206)在org.apache.kafka.streams.processor.internals.processorstatemanager.restoreactivestate(processorstatemanager。java:245)位于org.apache.kafka.streams.processor.internals.processorstatemanager.register(processorstatemanager。java:210)位于org.apache.kafka.streams.processor.internals.processorcontextimpl.register(processorcontextimpl。java:116)在org.apache.kafka.streams.state.internals.rocksdbstore.init(rocksdbstore。java:202)
1条答案
按热度按时间acruukt91#
发现问题是由于在应用程序代码中创建流而不是使用command:-
似乎连接需要分区信息才能工作。