我一直在研究Kafka的推特流媒体数据。
下面是我的示例链接:http://www.hahaskills.com/tutorials/kafka/twitter_doc.html
我能够使用生产者代码,它是工作良好。能够得到推特饲料和发送给Kafka制片人。
我不能使用消费代码,因为它已经抛出了许多API的弃用错误。
以下是消费者代码:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
//import kafka.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
//import kafka.consumer.KafkaStream;
//import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
//import org.apache.kafka.clients.producer.KafkaProducer;
public class KafkaConsumer {
private final ConsumerConnector consumer;
private final String topic;
public KafkaConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
this.topic = topic;
}
public void testConsumer() {
System.out.println("Test Con called");
Map<String, Integer> topicCount = new HashMap<>();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
System.out.println("For");
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("Size"+it.length());
while (it.hasNext()) {
System.out.println("Stream");
System.out.println("Message from Single Topic: " + new String(it.next().message()));
}
}
if (consumer != null) {
consumer.shutdown();
}
}
public static void main(String[] args) {
System.out.println("Started");
String topic="twittertopic";
KafkaConsumer simpleTWConsumer = new KafkaConsumer("localhost:XXXX", "testgroup", topic);
simpleTWConsumer.testConsumer();
System.out.println("End");
}
}
它抛出错误:consumerconnector、consumeriterator、kafkastream已弃用。
consumerconfig不可见。
这个示例代码有固定版本吗(kafka consumer for twitter)?
1条答案
按热度按时间koaltpgm1#
您所遵循的教程非常旧,它使用的是已弃用的旧scala-kafka客户机,请参阅http://kafka.apache.org/documentation/#legacyapis
已弃用的类包括:
kafka.consumer.*
以及kafka.javaapi.consumer
而是在下面使用更新的java消费者org.apache.kafka.clients.consumer.*
kafka.producer.*
以及kafka.javaapi.producer
而是在下面使用更新的java producerorg.apache.kafka.clients.producer.*
除了使用不推荐使用的类之外,您的代码基本上是正确的,我只需要修复一些导入。请参阅下面的固定版本。使用它,我可以将我正在生成的消息消费到一个名为twittertopic
.虽然可以使用上面的代码,但下一个主要的kafka版本可能会删除当前不推荐使用的类,因此不应该使用这些类编写新的逻辑。
相反,您应该开始使用java客户机,您可以使用github上提供的示例:https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
使用新的java使用者,您的逻辑如下所示: