我有个小问题。我想从windows java producer脚本连接到vm centos上的kafka服务器。
在config/server.properties中,我有一行代码:
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://<public ip>:9092
(例如:advised.listeners)=plaintext://192.239.83.27:9092)
我仍然无法在windows上生成来自producer的消息。在虚拟盒上,我在网络设置中设置了“allow all”选项
无法运行kafka服务器,持续运行时出现警告:[controller id=0,targetbrokerid=0]无法建立到节点0的连接。代理可能不可用(org.apache.kafka.clients.networkclient)。
请帮忙:p
以及我在windows上的java代码:
public class TwitterProducer {
Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());
String consumerKey = "xxx";
String consumerSecret = "xxx";
String token = "xxx";
String secret = "xxx";
public TwitterProducer() {}
public static void main(String[] args) {
new TwitterProducer().run();
}
public void run()
{
logger.info("Setup");
/**Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);
// create a twitter client
Client client = createTwitterClient(msgQueue);
// Attempts to establish a connection.
client.connect();
// create a kafka producer
KafkaProducer<String, String> producer = createKafkaProducer();
// on a different thread, or multiple different threads....
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if(msg != null){
logger.info(msg);
producer.send(new ProducerRecord<String, String>("twitter_tweets", null, msg), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null){
logger.error("Something bad happened", e);
}
}
});
}
}
logger.info("End of application");
}
public Client createTwitterClient(BlockingQueue<String> msgQueue)
{
/**Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
// Optional: set up some followings and track terms
List<String> terms = Lists.newArrayList("bitcoin");
hosebirdEndpoint.trackTerms(terms);
// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue))
; // optional: use this if you want to process client events
Client hosebirdClient = builder.build();
return hosebirdClient;
}
public KafkaProducer<String, String> createKafkaProducer()
{
String bootstrapServers = "193.239.83.27:9092";
// create the producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;
}
}
1条答案
按热度按时间dpiehjr41#
你说你有
listeners=PLAINTEXT://0.0.0.0:9092
但从图片上看,这已经被评论掉了。日志还说,Kafka正在关闭,所以要确保brokerid是正数,并确保zookeeper正在运行
顺便说一句,有一个Kafka连接推特的来源,似乎做你想要的