如何将kafka(java)应用程序从windows连接到linux中的合流

bwleehnv  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(359)

我正在linux服务器上使用winscp和putty运行Confluent5.0。我在windows中有kafka(java/eclipse)应用程序。
当我运行java应用程序时,它并没有识别在linux上运行的合流中的kafka代理。
通过在mac终端上运行confluent5.0,我已经测试了我的java应用程序,它将数据发送到macbook中的kafka主题。现在我正在尝试在windows中实现相同的kafka应用程序。由于windows不支持confluent,所以我在linux服务器上运行。
我使用confluent而不是ApacheKafka,因为我在应用程序中使用的是模式注册表。
通过使用netstat-tupln&curl-vhttp:/localhost:port no. 找出了kafka在8082上运行,而schema registry在8081上运行的端口细节。下面是我在java应用程序中的kafka属性。

public static Properties producerProperties() {

    // normal producer
    properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
    properties.setProperty("acks", "all");
    properties.setProperty("retries", "10");
    // avro part
    properties.setProperty("key.serializer", StringSerializer .class.getName());
    properties.setProperty("value.serializer", KafkaAvroSerializer .class.getName());
    properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");

    return properties;

}

public static Properties consumerProperties() {

   // Properties properties = new Properties();
    // normal consumer
    properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
    //different for consumer
    properties.setProperty("group.id", "Avro-consumer");
    properties.setProperty("enable.auto.commit", "false");
    properties.setProperty("auto.offset.reset", "earliest");

    // avro part
    properties.setProperty("key.deserializer", StringDeserializer.class.getName());
    properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
    properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
    properties.setProperty("specific.avro.reader", "true");

    return properties;
}

public static Properties streamsProperties() {

    // normal consumer
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "com.github.ptn006");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:8082");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    return properties;
}

预期:数据写入Kafka主题。
实际:无法建立到节点-1的warn连接。代理可能不可用(org.apache.kafka.clients.netw网站orkclient:589)

anauzrmj

anauzrmj1#

你要确保 advertised.listenersserver.properties Kafka中的文件可由windows计算机解析。还要确保防火墙允许访问( netstat -tupln | grep LIST ),并寻找你的Kafka端口监听 0.0.0.0 ,例如。

相关问题