我用kafka编写了一个获取twitter tweets的代码,它可以正常工作,但对于分区不起作用。我想为一个主题创建3个分区。。如何将值传递给partitioner类。。有没有关于我做错了什么的建议
public class kafkaSpoutFetchingRealTweets {
private String consumerKey;
private String consumerSecret;
private String accessToken;
private String accessTokenSecret;
private TwitterStream twitterStream;
/**
* @param contxt
*/
void start(final Context context) {
/**Producer properties**/
Properties props = new Properties();
props.put("metadata.broker.list",
context.getString(Constant.BROKER_LIST));
props.put("partitioner.class","SimplePartitioner");
props.put("serializer.class", context.getString(Constant.SERIALIZER));
props.put("request.required.acks",
context.getString(Constant.REQUIRED_ACKS));
props.put("producer.type", "async");
// props.put("partitioner.class", context.getClass());
ProducerConfig config = new ProducerConfig(props);
final Producer<String, String> producer = new Producer<String, String>(
config);
/**Twitter properties**/
consumerKey = context.getString(Constant.CONSUMER_KEY_KEY);
consumerSecret = context.getString(Constant.CONSUMER_SECRET_KEY);
accessToken = context.getString(Constant.ACCESS_TOKEN_KEY);
accessTokenSecret = context.getString(Constant.ACCESS_TOKEN_SECRET_KEY);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setOAuthConsumerKey(consumerKey);
cb.setOAuthConsumerSecret(consumerSecret);
cb.setOAuthAccessToken(accessToken);
cb.setOAuthAccessTokenSecret(accessTokenSecret);
cb.setJSONStoreEnabled(true);
cb.setIncludeEntitiesEnabled(true);
twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
/**Twitter listener**/
StatusListener listener = new StatusListener() {
// The onStatus method is executed every time a new tweet comes
// in.
public void onStatus(Status status) {
if(("en".equals(status.getLang())) && ("en".equals(status.getUser().getLang()))){
KeyedMessage<String, String> data = new KeyedMessage<String, String>(
context.getString(Constant.data),
DataObjectFactory.getRawJSON(status));
producer.send(data);
System.out.println(DataObjectFactory.getRawJSON(status));
}
}
}
public void onDeletionNotice(
StatusDeletionNotice statusDeletionNotice) {
}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
}
public void onScrubGeo(long userId, long upToStatusId) {
}
public void onException(Exception ex) {
ex.printStackTrace();
logger.info("Shutting down Twitter sample stream...");
twitterStream.shutdown();
}
public void onStallWarning(StallWarning warning) {
System.out.println("stallWarning");
}
};
String[] lang = { "en" };
fq.language(lang);
twitterStream.addListener(listener);
twitterStream.sample();
}
public static void main(String[] args) {
try {
Context context = new Context(args[0]);
kafkaSpoutFetchingRealTweets tp = new kafkaSpoutFetchingRealTweets();
tp.start(context);
} catch (Exception e) {
e.printStackTrace();
logger.info(e.getMessage());
}
}
}
1条答案
按热度按时间h22fl7wq1#
所以有几个问题。
你的问题和代码不匹配。你的问题是关于创建一个有3个分区的主题。但是您提供的代码和示例说明了如何确定消息应该发送到哪个分区,因为您已经创建了一个包含3个分区的主题。
如果你真的想创建一个有3个分区的主题,你需要使用命令行客户机。可以在这里找到样本,http://kafka.apache.org/documentation.html#quickstart
如果你真的想确定你需要哪个分区来发送数据。你需要提供更多关于你遇到的实际问题的信息吗?他们都去同一个分区吗?然后您需要看看如何计算
SimplePartitioner
在配置中指定的类。房间里有什么SimplePartitioner
上课?