kafka使用javaapi创建分区

rqqzpn5f  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(345)

我用kafka编写了一个获取twitter tweets的代码,它可以正常工作,但对于分区不起作用。我想为一个主题创建3个分区。。如何将值传递给partitioner类。。有没有关于我做错了什么的建议

public class kafkaSpoutFetchingRealTweets {

private String consumerKey;
private String consumerSecret;
private String accessToken;
private String accessTokenSecret;
private TwitterStream twitterStream;

/**
 * @param contxt
 */
void start(final Context context) {

    /**Producer properties**/
    Properties props = new Properties();
    props.put("metadata.broker.list",
            context.getString(Constant.BROKER_LIST));
    props.put("partitioner.class","SimplePartitioner");
    props.put("serializer.class", context.getString(Constant.SERIALIZER));
    props.put("request.required.acks",
            context.getString(Constant.REQUIRED_ACKS));
    props.put("producer.type", "async");
    // props.put("partitioner.class", context.getClass());
    ProducerConfig config = new ProducerConfig(props);

    final Producer<String, String> producer = new Producer<String, String>(
            config);

    /**Twitter properties**/
    consumerKey = context.getString(Constant.CONSUMER_KEY_KEY);
    consumerSecret = context.getString(Constant.CONSUMER_SECRET_KEY);
    accessToken = context.getString(Constant.ACCESS_TOKEN_KEY);
    accessTokenSecret = context.getString(Constant.ACCESS_TOKEN_SECRET_KEY);

    ConfigurationBuilder cb = new ConfigurationBuilder();
    cb.setOAuthConsumerKey(consumerKey);
    cb.setOAuthConsumerSecret(consumerSecret);
    cb.setOAuthAccessToken(accessToken);
    cb.setOAuthAccessTokenSecret(accessTokenSecret);
    cb.setJSONStoreEnabled(true);
    cb.setIncludeEntitiesEnabled(true);

    twitterStream = new TwitterStreamFactory(cb.build()).getInstance();

    /**Twitter listener**/
    StatusListener listener = new StatusListener() {
        // The onStatus method is executed every time a new tweet comes
        // in.
        public void onStatus(Status status) {

            if(("en".equals(status.getLang())) && ("en".equals(status.getUser().getLang()))){

                KeyedMessage<String, String> data = new KeyedMessage<String, String>(
                        context.getString(Constant.data),
                        DataObjectFactory.getRawJSON(status));
                producer.send(data);
                System.out.println(DataObjectFactory.getRawJSON(status));

            }
        }
        }

        public void onDeletionNotice(
                StatusDeletionNotice statusDeletionNotice) {
        }

        public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
        }

        public void onScrubGeo(long userId, long upToStatusId) {
        }

        public void onException(Exception ex) {
            ex.printStackTrace();
            logger.info("Shutting down Twitter sample stream...");
            twitterStream.shutdown();
        }

        public void onStallWarning(StallWarning warning) {
            System.out.println("stallWarning");
        }
    };

    String[] lang = { "en" };
    fq.language(lang);
    twitterStream.addListener(listener);
    twitterStream.sample();

}

public static void main(String[] args) {
    try {

        Context context = new Context(args[0]);
        kafkaSpoutFetchingRealTweets tp = new kafkaSpoutFetchingRealTweets();
        tp.start(context);

    } catch (Exception e) {
        e.printStackTrace();
        logger.info(e.getMessage());
    }

}

}

h22fl7wq

h22fl7wq1#

所以有几个问题。
你的问题和代码不匹配。你的问题是关于创建一个有3个分区的主题。但是您提供的代码和示例说明了如何确定消息应该发送到哪个分区,因为您已经创建了一个包含3个分区的主题。
如果你真的想创建一个有3个分区的主题,你需要使用命令行客户机。可以在这里找到样本,http://kafka.apache.org/documentation.html#quickstart
如果你真的想确定你需要哪个分区来发送数据。你需要提供更多关于你遇到的实际问题的信息吗?他们都去同一个分区吗?然后您需要看看如何计算 SimplePartitioner 在配置中指定的类。房间里有什么 SimplePartitioner 上课?

props.put("partitioner.class","SimplePartitioner");

相关问题