本文整理了Java中org.apache.kafka.clients.consumer.Consumer.assignment()
方法的一些代码示例,展示了Consumer.assignment()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.assignment()
方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:assignment
暂无
代码示例来源:origin: openzipkin/brave
@Override public Set<TopicPartition> assignment() {
return delegate.assignment();
}
代码示例来源:origin: apache/storm
private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) {
final Set<TopicPartition> assignments = consumer.assignment();
if (!assignments.contains(currBatchTp)) {
throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned."
+ " This indicates a bug in the TopicFilter or ManualPartitioner implementations."
+ " The current partition is [" + currBatchTp + "], the assigned partitions are [" + assignments + "].");
}
}
代码示例来源:origin: apache/storm
private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(consumer.assignment());
LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions);
pausedTopicPartitions.remove(excludedTp);
consumer.pause(pausedTopicPartitions);
LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
return pausedTopicPartitions;
}
代码示例来源:origin: linkedin/cruise-control
/**
* The check if the consumption is done or not. The consumption is done if the consumer has caught up with the
* log end or all the partitions are paused.
* @param endOffsets the log end for each partition.
* @return true if the consumption is done, false otherwise.
*/
private boolean consumptionDone(Map<TopicPartition, Long> endOffsets) {
Set<TopicPartition> partitionsNotPaused = new HashSet<>(_metricConsumer.assignment());
partitionsNotPaused.removeAll(_metricConsumer.paused());
for (TopicPartition tp : partitionsNotPaused) {
if (_metricConsumer.position(tp) < endOffsets.get(tp)) {
return false;
}
}
return true;
}
代码示例来源:origin: apache/storm
/**
* Assign partitions to the KafkaConsumer.
* @param <K> The consumer key type
* @param <V> The consumer value type
* @param consumer The Kafka consumer to assign partitions to
* @param newAssignment The partitions to assign.
* @param listener The rebalance listener to call back on when the assignment changes
*/
public <K, V> void assignPartitions(Consumer<K, V> consumer, Set<TopicPartition> newAssignment,
ConsumerRebalanceListener listener) {
Set<TopicPartition> currentAssignment = consumer.assignment();
if (!newAssignment.equals(currentAssignment)) {
listener.onPartitionsRevoked(currentAssignment);
consumer.assign(newAssignment);
listener.onPartitionsAssigned(newAssignment);
}
}
代码示例来源:origin: apache/storm
Set<TopicPartition> assignment = consumer.assignment();
if (!isAtLeastOnceProcessing()) {
return new PollablePartitionsInfo(assignment, Collections.emptyMap());
代码示例来源:origin: apache/storm
private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
try {
consumer.pause(pausedPartitions);
final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka",
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(consumer.assignment());
consumer.commitSync(offsetsToCommit);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
return consumerRecords;
} finally {
consumer.resume(pausedPartitions);
}
}
代码示例来源:origin: apache/nifi
/**
* Execute poll using pause API just for sending heartbeat, not polling messages.
*/
void retainConnection() {
pollingLock.lock();
TopicPartition[] assignments = null;
try {
final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
if (assignmentSet.isEmpty()) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Pausing " + assignmentSet);
}
assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
kafkaConsumer.pause(assignments);
kafkaConsumer.poll(0);
if (logger.isDebugEnabled()) {
logger.debug("Resuming " + assignments);
}
} finally {
try {
if (assignments != null) {
kafkaConsumer.resume(assignments);
}
} finally {
pollingLock.unlock();
}
}
}
代码示例来源:origin: linkedin/cruise-control
while (_metricConsumer.assignment().isEmpty()) {
pollerCount++;
_metricConsumer.poll(10);
for (TopicPartition tp : _metricConsumer.assignment()) {
timestampToSeek.put(tp, startTimeMs);
Set<TopicPartition> assignment = new HashSet<>(_metricConsumer.assignment());
Map<TopicPartition, Long> endOffsets = _metricConsumer.endOffsets(assignment);
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = _metricConsumer.offsetsForTimes(timestampToSeek);
LOG.debug("Starting consuming from metrics reporter topic partitions {}.", _metricConsumer.assignment());
_metricConsumer.assignment(), startTimeMs, endTimeMs, totalMetricsAdded);
代码示例来源:origin: apache/storm
@Override
public void nextTuple() {
try {
if (refreshAssignmentTimer.isExpiredResetOnTrue()) {
refreshAssignment();
}
if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
if (isAtLeastOnceProcessing()) {
commitOffsetsForAckedTuples();
} else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(consumer.assignment());
consumer.commitAsync(offsetsToCommit, null);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
}
PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
if (pollablePartitionsInfo.shouldPoll()) {
try {
setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
} catch (RetriableException e) {
LOG.error("Failed to poll from kafka.", e);
}
}
emitIfWaitingNotEmitted();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}
代码示例来源:origin: apache/metron
@Override
public String getSampleMessage(final String topic) {
String message = null;
if (listTopics().contains(topic)) {
try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) {
kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
.map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
kafkaConsumer.assignment().stream()
.filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
.forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
message = records.isEmpty() ? null : records.iterator().next().value();
kafkaConsumer.unsubscribe();
}
}
return message;
}
代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client
@Override
public Set<TopicPartition> assignment() {
return consumer.assignment();
}
代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients
@Override public Set<TopicPartition> assignment() {
return delegate.assignment();
}
代码示例来源:origin: rayokota/kafka-graphs
@Override
public Set<TopicPartition> assignment() {
return kafkaConsumer.assignment();
}
代码示例来源:origin: spring-projects/spring-kafka
return null;
}).given(consumer).commitSync(any(Map.class));
given(consumer.assignment()).willReturn(records.keySet());
final CountDownLatch pauseLatch = new CountDownLatch(2);
willAnswer(i -> {
代码示例来源:origin: spring-projects/spring-kafka
return null;
}).given(consumer).commitSync(any(Map.class));
given(consumer.assignment()).willReturn(records1.keySet());
TopicPartitionInitialOffset[] topicPartitionOffset = new TopicPartitionInitialOffset[] {
new TopicPartitionInitialOffset("foo", 0) };
代码示例来源:origin: com.cerner.common.kafka/common-kafka
private Set<TopicPartition> getAssignedPartitions() {
Set<TopicPartition> assignedPartitions = consumer.assignment();
if (assignedPartitions.isEmpty()) {
// Polling with an immediate timeout will initialize the assignments for a fresh consumer.
pollRecords(0L);
assignedPartitions = consumer.assignment();
}
return assignedPartitions;
}
代码示例来源:origin: rayokota/kafka-graphs
private static Set<TopicPartition> localPartitions(Consumer<byte[], byte[]> consumer, String topic) {
Set<TopicPartition> result = new HashSet<>();
Set<TopicPartition> assignment = consumer.assignment();
for (TopicPartition tp : assignment) {
if (tp.topic().equals(topic)) {
result.add(tp);
}
}
return result;
}
代码示例来源:origin: vert-x3/vertx-kafka-client
@Override
public KafkaReadStream<K, V> assignment(Handler<AsyncResult<Set<TopicPartition>>> handler) {
this.submitTask((consumer, future) -> {
Set<TopicPartition> partitions = consumer.assignment();
if (future != null) {
future.complete(partitions);
}
}, handler);
return this;
}
代码示例来源:origin: linkedin/li-apache-kafka-clients
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
Set<String> newSubscription = new HashSet<>(topics);
// TODO: This is a hot fix for KAFKA-3664 and should be removed after the issue is fixed.
commitSync();
for (TopicPartition tp : _kafkaConsumer.assignment()) {
if (!newSubscription.contains(tp.topic())) {
_consumerRecordsProcessor.clear(tp);
}
}
_consumerRebalanceListener.setUserListener(callback);
_kafkaConsumer.subscribe(new ArrayList<>(topics), _consumerRebalanceListener);
}
内容来源于网络,如有侵权,请联系作者删除!