本文整理了Java中org.apache.kafka.clients.consumer.Consumer.pause()
方法的一些代码示例,展示了Consumer.pause()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.pause()
方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:pause
暂无
代码示例来源:origin: openzipkin/brave
@Override public void pause(Collection<TopicPartition> partitions) {
delegate.pause(partitions);
}
代码示例来源:origin: linkedin/cruise-control
LOG.debug("Saw metric {} whose timestamp is larger than start time {}. Pausing partition {} at offset",
record.value(), record.value().time(), tp, record.offset());
_metricConsumer.pause(Collections.singleton(tp));
} else {
LOG.debug("Discarding metric {} because the timestamp {} is smaller than the start time {}",
代码示例来源:origin: apache/storm
private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(consumer.assignment());
LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions);
pausedTopicPartitions.remove(excludedTp);
consumer.pause(pausedTopicPartitions);
LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
return pausedTopicPartitions;
}
代码示例来源:origin: spring-projects/spring-kafka
pauseLatch.countDown();
return null;
}).given(consumer).pause(records.keySet());
given(consumer.paused()).willReturn(records.keySet());
final CountDownLatch resumeLatch = new CountDownLatch(2);
代码示例来源:origin: apache/nifi
/**
* Execute poll using pause API just for sending heartbeat, not polling messages.
*/
void retainConnection() {
pollingLock.lock();
TopicPartition[] assignments = null;
try {
final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
if (assignmentSet.isEmpty()) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Pausing " + assignmentSet);
}
assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
kafkaConsumer.pause(assignments);
kafkaConsumer.poll(0);
if (logger.isDebugEnabled()) {
logger.debug("Resuming " + assignments);
}
} finally {
try {
if (assignments != null) {
kafkaConsumer.resume(assignments);
}
} finally {
pollingLock.unlock();
}
}
}
代码示例来源:origin: reactor/reactor-kafka
@Test
public void consumerMethods() throws Exception {
testConsumerMethod(c -> assertEquals(this.assignedPartitions, c.assignment()));
testConsumerMethod(c -> assertEquals(Collections.singleton(topic), c.subscription()));
testConsumerMethod(c -> assertEquals(2, c.partitionsFor(topics.get(2)).size()));
testConsumerMethod(c -> assertEquals(topics.size(), c.listTopics().size()));
testConsumerMethod(c -> assertEquals(0, c.metrics().size()));
testConsumerMethod(c -> {
Collection<TopicPartition> partitions = Collections.singleton(new TopicPartition(topic, 1));
c.pause(partitions);
assertEquals(partitions, c.paused());
c.resume(partitions);
});
testConsumerMethod(c -> {
TopicPartition partition = new TopicPartition(topic, 1);
Collection<TopicPartition> partitions = Collections.singleton(partition);
long position = c.position(partition);
c.seekToBeginning(partitions);
assertEquals(0, c.position(partition));
c.seekToEnd(partitions);
assertTrue("Did not seek to end", c.position(partition) > 0);
c.seek(partition, position);
});
}
代码示例来源:origin: apache/storm
private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
try {
consumer.pause(pausedPartitions);
final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka",
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(consumer.assignment());
consumer.commitSync(offsetsToCommit);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
return consumerRecords;
} finally {
consumer.resume(pausedPartitions);
}
}
代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client
@Override
public void pause(Collection<TopicPartition> partitions) {
consumer.pause(partitions);
}
代码示例来源:origin: linkedin/li-apache-kafka-clients
@Override
public void pause(Collection<TopicPartition> partitions) {
_kafkaConsumer.pause(partitions);
}
代码示例来源:origin: rayokota/kafka-graphs
@Override
public void pause(Collection<TopicPartition> partitions) {
kafkaConsumer.pause(partitions);
}
代码示例来源:origin: opentracing-contrib/java-kafka-client
@Override
public void pause(Collection<TopicPartition> partitions) {
consumer.pause(partitions);
}
代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients
@Override public void pause(Collection<TopicPartition> partitions) {
delegate.pause(partitions);
}
代码示例来源:origin: vert-x3/vertx-kafka-client
@Override
public KafkaReadStream<K, V> pause(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Void>> completionHandler) {
this.submitTask((consumer, future) -> {
consumer.pause(topicPartitions);
if (future != null) {
future.complete();
}
}, completionHandler);
return this;
}
代码示例来源:origin: org.apache.kafka/kafka-streams
/**
* Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
* and not added to the queue for processing
*
* @param partition the partition
* @param records the records
*/
public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
final int newQueueSize = partitionGroup.addRawRecords(partition, records);
if (log.isTraceEnabled()) {
log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
}
// if after adding these records, its partition queue's buffered size has been
// increased beyond the threshold, we can then pause the consumption for this partition
if (newQueueSize > maxBufferedSize) {
consumer.pause(singleton(partition));
}
}
代码示例来源:origin: apache/crunch
/**
* Checks whether the value for {@code topicPartition} with an {@code offset} is within scan range. If
* the value is not then {@code false} is returned otherwise {@code true}.
*
* @param topicPartion The partition for the offset
* @param offset the offset in the partition
* @return {@code true} if the value is within the expected consumption range, otherwise {@code false}.
*/
private boolean withinRange(TopicPartition topicPartion, long offset) {
long endOffset = offsets.get(topicPartion).second();
//end offsets are one higher than the last written value.
boolean emit = offset < endOffset;
if (offset >= endOffset - 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Completed consuming partition {} with offset {} and ending offset {}.",
new Object[]{topicPartion, offset, endOffset});
}
remainingPartitions.remove(topicPartion);
consumer.pause(Collections.singleton(topicPartion));
}
LOG.debug("Value for partition {} and offset {} is within range.", topicPartion, offset);
return emit;
}
}
代码示例来源:origin: apache/crunch
/**
* Checks whether the value for {@code topicPartition} with an {@code offset} is within scan range. If
* the value is not then {@code false} is returned otherwise {@code true}.
*
* @param topicPartion The partition for the offset
* @param offset the offset in the partition
* @return {@code true} if the value is within the expected consumption range, otherwise {@code false}.
*/
private boolean withinRange(TopicPartition topicPartion, long offset) {
long endOffset = offsets.get(topicPartion).second();
//end offsets are one higher than the last written value.
boolean emit = offset < endOffset;
if (offset >= endOffset - 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Completed consuming partition {} with offset {} and ending offset {}.",
new Object[] { topicPartion, offset, endOffset });
}
remainingPartitions.remove(topicPartion);
consumer.pause(Arrays.asList(topicPartion));
}
LOG.debug("Value for partition {} and offset {} is within range.", topicPartion, offset);
return emit;
}
}
代码示例来源:origin: apache/samza
kafkaConsumer.pause(topicPartitionsToPause);
kafkaConsumer.resume(topicPartitionsToPoll);
records = kafkaConsumer.poll(timeoutMs);
代码示例来源:origin: org.apache.samza/samza-kafka
kafkaConsumer.pause(topicPartitionsToPause);
kafkaConsumer.resume(topicPartitionsToPoll);
records = kafkaConsumer.poll(timeoutMs);
代码示例来源:origin: org.apache.kafka/kafka-streams
void createTasks(final Collection<TopicPartition> assignment) {
if (consumer == null) {
throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
}
changelogReader.reset();
// do this first as we may have suspended standby tasks that
// will become active or vice versa
standby.closeNonAssignedSuspendedTasks(assignedStandbyTasks);
active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
addStreamTasks(assignment);
addStandbyTasks();
// Pause all the partitions until the underlying state store is ready for all the active tasks.
log.trace("Pausing partitions: {}", assignment);
consumer.pause(assignment);
}
代码示例来源:origin: org.apache.nifi/nifi-kafka-0-9-processors
/**
* Execute poll using pause API just for sending heartbeat, not polling messages.
*/
void retainConnection() {
pollingLock.lock();
TopicPartition[] assignments = null;
try {
final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
if (assignmentSet.isEmpty()) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Pausing " + assignmentSet);
}
assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
kafkaConsumer.pause(assignments);
kafkaConsumer.poll(0);
if (logger.isDebugEnabled()) {
logger.debug("Resuming " + assignments);
}
} finally {
try {
if (assignments != null) {
kafkaConsumer.resume(assignments);
}
} finally {
pollingLock.unlock();
}
}
}
内容来源于网络,如有侵权,请联系作者删除!