apache ignite kafka连接问题

k97glaaz  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(475)

我正在尝试对kafka消息流进行流处理和cep。为此,我选择ApacheIgnite来首先实现一个原型。但是,我无法连接到队列:
使用Kafka2.11-0.10.1.0 apache-ignite-fabric-1.8.0-bin
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties bin/kafka-topics.sh--创建--zookeeperlocalhost:2181 --replication-factor 1--分区1--主题测试
Kafka工作正常,我测试了它与消费者。然后我启动ignite,然后在SpringBoot命令行应用程序中运行以下命令。

KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

    Ignition.setClientMode(true);

    Ignite ignite = Ignition.start();

    Properties settings = new Properties();
    // Set a few key parameters
    settings.put("bootstrap.servers", "localhost:9092");
    settings.put("group.id", "test");
    settings.put("zookeeper.connect", "localhost:2181");
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    // Create an instance of StreamsConfig from the Properties instance
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");

    try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) {
        // allow overwriting cache data
        stmr.allowOverwrite(true);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);

        // set the topic
        kafkaStreamer.setTopic("test");

        // set the number of threads to process Kafka streams
        kafkaStreamer.setThreads(1);

        // set Kafka consumer configurations
        kafkaStreamer.setConsumerConfig(config);

        // set decoders
        StringDecoder keyDecoder = new StringDecoder(null);
        StringDecoder valueDecoder = new StringDecoder(null);

        kafkaStreamer.setKeyDecoder(keyDecoder);
        kafkaStreamer.setValueDecoder(valueDecoder);

        kafkaStreamer.start();
    } finally {
        kafkaStreamer.stop();
    }

当应用程序启动时
2017-02-23 10:25:23.409 warn 1388---[main]kafka.utils.verifiableproperties:属性bootstrap.servers无效2017-02-23 10:25:23.410 info 1388---[main]kafka.utils.verifiableproperties:property group.id被重写以测试2017-02-23 10:25:23.410 warn 1388---[main]kafka.utils.verifiableproperties:属性键.deserializer无效2017-02-23 10:25:23.411警告1388---[main]kafka.utils.verifiableproperties:属性键.serializer无效2017-02-23 10:25:23.411警告1388---[main]kafka.utils.verifiableproperties:属性值.deserializer无效2017-02-23 10:25:23.411警告1388---[main]kafka.utils.verifiableproperties:属性value.serializer无效2017-02-23 10:25:23.411 info 1388---[main]kafka.utils.verifiableproperties:属性zookeeper.connect被重写为localhost:2181
然后
2017-02-23 10:25:24.057 warn 1388---[r-finder-thread]kafka.client.clientutils$:从代理[brokerendpoint(0,user.local,9092)]获取主题[set(test)]的相关id为0的主题元数据失败
java.nio.channels.closedchannelexception:kafka.network.blockingchannel.send(blockingchannel)处为空。scala:110)~[Kafka2.11-0.10.0.1。jar:na]在Kafka。制作人。同步制作人。liftedtree1$1(同步制作人。scala:80)~[Kafka2.11-0.10.0.1。jar:na]在kafka.producer.syncproducer.kafka$producer$syncproducer$$dosend(syncproducer。scala:79) ~[Kafka2.11-0.10.0.1。jar:na]在kafka.producer.syncproducer.send(syncproducer。scala:124)~[Kafka2.11-0.10.0.1。jar:na]在kafka.client.clientutils$.fetchtopicmetadata(clientutils。scala:59)[Kafka2.11-0.10.0.1。jar:na]在kafka.client.clientutils$.fetchtopicmetadata(clientutils。scala:94)[Kafka2.11-0.10.0.1。jar:na]在kafka.consumer.consumerfetchermanager$leaderfinderthread.dowork(consumerfetchermanager。scala:66)[Kafka2.11-0.10.0.1。jar:na]在kafka.utils.shutdownablethread.run(shutdownablethread。scala:63)[Kafka2.11-0.10.0.1。jar:na]
从队列中读取也不起作用。有人知道怎么解决这个问题吗?
编辑:如果我注解finally块的内容,那么会出现以下错误
[2m2017-02-27 16:42:27.780[0;39米[31米];39m[35m29946[0;39m[2m---[0;39m[2m[pool-3-thread-1][0;39m[36m[0;39m[2m:[0;由于错误[msg=messageandmetadata(test,0,message(magic=1,attributes=0,createtime=-1,crc=2558126716,key=java.nio.heapbytebuffer[pos=0 lim=1 cap=79],payload=java.nio.heapbytebuffer[pos=0 lim=74 cap=74]),15941704,kafka.serializer,39m消息被忽略。stringdecoder@74a96647,kafka.serializer。stringdecoder@42849d34,-1,创建时间)]
java.lang.illegalstateexception:数据流已关闭。位于org.apache.ignite.internal.processors.datastreamer.datastreamerimpl.enterbusy(datastreamerimpl。java:401)~[点火核心-1.8.0。jar:1.8.0]位于org.apache.ignite.internal.processors.datastreamer.datastreamerimpl.adddatainternal(datastreamerimpl。java:613)~[点火核心-1.8.0。jar:1.8.0]在org.apache.ignite.internal.processors.datastreamer.datastreamerimpl.adddata(datastreamerimpl。java:667)~[点火核心-1.8.0。jar:1.8.0]在org.apache.ignite.stream.kafka.kafkastreamer$1.run(kafkastreamer。java:180)~[点火-Kafka-1.8.0。jar:1.8.0]在java.util.concurrent.executors$runnableadapter.call(executors。java:511)[na:1.8.0_]在java.util.concurrent.futuretask.run(futuretask。java:266)[na:1.8.0_]位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)[na:1.8.0_]位于java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)[na:1.8.0_]at java.lang.thread.run(线程。java:745)[na:1.8.0_]
谢谢!

gv8xihay

gv8xihay1#

我想这是因为 KafkaStreamer 刚开始就关门了( kafkaStreamer.stop() 呼叫 finally 块)。 kafkaStreamer.start() 不是同步的,它只是从kafka输出线程来消费并退出。

相关问题