kafka:通过公共ip从windows访问vm上的kafka服务器

vhmi4jdf  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(596)

我有个小问题。我想从windows java producer脚本连接到vm centos上的kafka服务器。
在config/server.properties中,我有一行代码:

listeners=PLAINTEXT://0.0.0.0:9092

advertised.listeners=PLAINTEXT://<public ip>:9092

(例如:advised.listeners)=plaintext://192.239.83.27:9092)
我仍然无法在windows上生成来自producer的消息。在虚拟盒上,我在网络设置中设置了“allow all”选项
无法运行kafka服务器,持续运行时出现警告:[controller id=0,targetbrokerid=0]无法建立到节点0的连接。代理可能不可用(org.apache.kafka.clients.networkclient)。
请帮忙:p

以及我在windows上的java代码:

public class TwitterProducer {

Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());

String consumerKey = "xxx";
String consumerSecret = "xxx";
String token = "xxx";
String secret = "xxx";

public TwitterProducer() {}

public static void main(String[] args) {
    new TwitterProducer().run();
}

public void run()
{
    logger.info("Setup");
    /**Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
    BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);

    // create a twitter client
    Client client = createTwitterClient(msgQueue);
    // Attempts to establish a connection.
    client.connect();

    // create a kafka producer
    KafkaProducer<String, String> producer = createKafkaProducer();

    // on a different thread, or multiple different threads....
    while (!client.isDone()) {
        String msg = null;
        try {
            msg = msgQueue.poll(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
            client.stop();
        }
        if(msg != null){
            logger.info(msg);
            producer.send(new ProducerRecord<String, String>("twitter_tweets", null, msg), new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null){
                        logger.error("Something bad happened", e);
                    }
                }
            });
        }
    }
    logger.info("End of application");
}

public Client createTwitterClient(BlockingQueue<String> msgQueue)
{
    /**Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
    Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
    StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
    // Optional: set up some followings and track terms

    List<String> terms = Lists.newArrayList("bitcoin");
    hosebirdEndpoint.trackTerms(terms);

    // These secrets should be read from a config file
    Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

    ClientBuilder builder = new ClientBuilder()
            .name("Hosebird-Client-01")                              // optional: mainly for the logs
            .hosts(hosebirdHosts)
            .authentication(hosebirdAuth)
            .endpoint(hosebirdEndpoint)
            .processor(new StringDelimitedProcessor(msgQueue))
             ;                          // optional: use this if you want to process client events

    Client hosebirdClient = builder.build();
    return hosebirdClient;
}

public KafkaProducer<String, String> createKafkaProducer()
{

    String bootstrapServers = "193.239.83.27:9092";

    // create the producer properties
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // create the producer
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    return producer;
}

}

dpiehjr4

dpiehjr41#

你说你有 listeners=PLAINTEXT://0.0.0.0:9092 但从图片上看,这已经被评论掉了。
日志还说,Kafka正在关闭,所以要确保brokerid是正数,并确保zookeeper正在运行
顺便说一句,有一个Kafka连接推特的来源,似乎做你想要的

相关问题