org.apache.kafka.clients.consumer.Consumer.pause()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(250)

本文整理了Java中org.apache.kafka.clients.consumer.Consumer.pause()方法的一些代码示例,展示了Consumer.pause()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.pause()方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:pause

Consumer.pause介绍

暂无

代码示例

代码示例来源:origin: openzipkin/brave

@Override public void pause(Collection<TopicPartition> partitions) {
 delegate.pause(partitions);
}

代码示例来源:origin: linkedin/cruise-control

LOG.debug("Saw metric {} whose timestamp is larger than start time {}. Pausing partition {} at offset",
      record.value(), record.value().time(), tp, record.offset());
 _metricConsumer.pause(Collections.singleton(tp));
} else {
 LOG.debug("Discarding metric {} because the timestamp {} is smaller than the start time {}",

代码示例来源:origin: apache/storm

private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
  final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(consumer.assignment());
  LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions);
  pausedTopicPartitions.remove(excludedTp);
  consumer.pause(pausedTopicPartitions);
  LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
  return pausedTopicPartitions;
}

代码示例来源:origin: spring-projects/spring-kafka

pauseLatch.countDown();
  return null;
}).given(consumer).pause(records.keySet());
given(consumer.paused()).willReturn(records.keySet());
final CountDownLatch resumeLatch = new CountDownLatch(2);

代码示例来源:origin: apache/nifi

/**
 * Execute poll using pause API just for sending heartbeat, not polling messages.
 */
void retainConnection() {
  pollingLock.lock();
  TopicPartition[] assignments = null;
  try {
    final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
    if (assignmentSet.isEmpty()) {
      return;
    }
    if (logger.isDebugEnabled()) {
      logger.debug("Pausing " + assignmentSet);
    }
    assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
    kafkaConsumer.pause(assignments);
    kafkaConsumer.poll(0);
    if (logger.isDebugEnabled()) {
      logger.debug("Resuming " + assignments);
    }
  } finally {
    try {
      if (assignments != null) {
        kafkaConsumer.resume(assignments);
      }
    } finally {
      pollingLock.unlock();
    }
  }
}

代码示例来源:origin: reactor/reactor-kafka

@Test
public void consumerMethods() throws Exception {
  testConsumerMethod(c -> assertEquals(this.assignedPartitions, c.assignment()));
  testConsumerMethod(c -> assertEquals(Collections.singleton(topic), c.subscription()));
  testConsumerMethod(c -> assertEquals(2, c.partitionsFor(topics.get(2)).size()));
  testConsumerMethod(c -> assertEquals(topics.size(), c.listTopics().size()));
  testConsumerMethod(c -> assertEquals(0, c.metrics().size()));
  testConsumerMethod(c -> {
    Collection<TopicPartition> partitions = Collections.singleton(new TopicPartition(topic, 1));
    c.pause(partitions);
    assertEquals(partitions, c.paused());
    c.resume(partitions);
  });
  testConsumerMethod(c -> {
    TopicPartition partition = new TopicPartition(topic, 1);
    Collection<TopicPartition> partitions = Collections.singleton(partition);
    long position = c.position(partition);
    c.seekToBeginning(partitions);
    assertEquals(0, c.position(partition));
    c.seekToEnd(partitions);
    assertTrue("Did not seek to end", c.position(partition) > 0);
    c.seek(partition, position);
  });
}

代码示例来源:origin: apache/storm

private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
  doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
  Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
  pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
  try {
    consumer.pause(pausedPartitions);
    final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
    final int numPolledRecords = consumerRecords.count();
    LOG.debug("Polled [{}] records from Kafka",
      numPolledRecords);
    if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
      //Commit polled records immediately to ensure delivery is at-most-once.
      Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
        createFetchedOffsetsMetadata(consumer.assignment());
      consumer.commitSync(offsetsToCommit);
      LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
    }
    return consumerRecords;
  } finally {
    consumer.resume(pausedPartitions);
  }
}

代码示例来源:origin: io.opentracing.contrib/opentracing-kafka-client

@Override
public void pause(Collection<TopicPartition> partitions) {
 consumer.pause(partitions);
}

代码示例来源:origin: linkedin/li-apache-kafka-clients

@Override
public void pause(Collection<TopicPartition> partitions) {
 _kafkaConsumer.pause(partitions);
}

代码示例来源:origin: rayokota/kafka-graphs

@Override
public void pause(Collection<TopicPartition> partitions) {
  kafkaConsumer.pause(partitions);
}

代码示例来源:origin: opentracing-contrib/java-kafka-client

@Override
public void pause(Collection<TopicPartition> partitions) {
 consumer.pause(partitions);
}

代码示例来源:origin: io.zipkin.brave/brave-instrumentation-kafka-clients

@Override public void pause(Collection<TopicPartition> partitions) {
 delegate.pause(partitions);
}

代码示例来源:origin: vert-x3/vertx-kafka-client

@Override
public KafkaReadStream<K, V> pause(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Void>> completionHandler) {
 this.submitTask((consumer, future) -> {
  consumer.pause(topicPartitions);
  if (future != null) {
   future.complete();
  }
 }, completionHandler);
 return this;
}

代码示例来源:origin: org.apache.kafka/kafka-streams

/**
 * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
 * and not added to the queue for processing
 *
 * @param partition the partition
 * @param records   the records
 */
public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
  final int newQueueSize = partitionGroup.addRawRecords(partition, records);
  if (log.isTraceEnabled()) {
    log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
  }
  // if after adding these records, its partition queue's buffered size has been
  // increased beyond the threshold, we can then pause the consumption for this partition
  if (newQueueSize > maxBufferedSize) {
    consumer.pause(singleton(partition));
  }
}

代码示例来源:origin: apache/crunch

/**
  * Checks whether the value for {@code topicPartition} with an {@code offset} is within scan range.  If
  * the value is not then {@code false} is returned otherwise {@code true}.
  *
  * @param topicPartion The partition for the offset
  * @param offset the offset in the partition
  * @return {@code true} if the value is within the expected consumption range, otherwise {@code false}.
  */
 private boolean withinRange(TopicPartition topicPartion, long offset) {
  long endOffset = offsets.get(topicPartion).second();
  //end offsets are one higher than the last written value.
  boolean emit = offset < endOffset;
  if (offset >= endOffset - 1) {
   if (LOG.isDebugEnabled()) {
    LOG.debug("Completed consuming partition {} with offset {} and ending offset {}.",
      new Object[]{topicPartion, offset, endOffset});
   }
   remainingPartitions.remove(topicPartion);
   consumer.pause(Collections.singleton(topicPartion));
  }
  LOG.debug("Value for partition {} and offset {} is within range.", topicPartion, offset);
  return emit;
 }
}

代码示例来源:origin: apache/crunch

/**
  * Checks whether the value for {@code topicPartition} with an {@code offset} is within scan range.  If
  * the value is not then {@code false} is returned otherwise {@code true}.
  *
  * @param topicPartion The partition for the offset
  * @param offset       the offset in the partition
  * @return {@code true} if the value is within the expected consumption range, otherwise {@code false}.
  */
 private boolean withinRange(TopicPartition topicPartion, long offset) {
  long endOffset = offsets.get(topicPartion).second();
  //end offsets are one higher than the last written value.
  boolean emit = offset < endOffset;
  if (offset >= endOffset - 1) {
   if (LOG.isDebugEnabled()) {
    LOG.debug("Completed consuming partition {} with offset {} and ending offset {}.",
      new Object[] { topicPartion, offset, endOffset });
   }
   remainingPartitions.remove(topicPartion);
   consumer.pause(Arrays.asList(topicPartion));
  }
  LOG.debug("Value for partition {} and offset {} is within range.", topicPartion, offset);
  return emit;
 }
}

代码示例来源:origin: apache/samza

kafkaConsumer.pause(topicPartitionsToPause);
kafkaConsumer.resume(topicPartitionsToPoll);
records = kafkaConsumer.poll(timeoutMs);

代码示例来源:origin: org.apache.samza/samza-kafka

kafkaConsumer.pause(topicPartitionsToPause);
kafkaConsumer.resume(topicPartitionsToPoll);
records = kafkaConsumer.poll(timeoutMs);

代码示例来源:origin: org.apache.kafka/kafka-streams

void createTasks(final Collection<TopicPartition> assignment) {
  if (consumer == null) {
    throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
  }
  changelogReader.reset();
  // do this first as we may have suspended standby tasks that
  // will become active or vice versa
  standby.closeNonAssignedSuspendedTasks(assignedStandbyTasks);
  active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
  addStreamTasks(assignment);
  addStandbyTasks();
  // Pause all the partitions until the underlying state store is ready for all the active tasks.
  log.trace("Pausing partitions: {}", assignment);
  consumer.pause(assignment);
}

代码示例来源:origin: org.apache.nifi/nifi-kafka-0-9-processors

/**
 * Execute poll using pause API just for sending heartbeat, not polling messages.
 */
void retainConnection() {
  pollingLock.lock();
  TopicPartition[] assignments = null;
  try {
    final Set<TopicPartition> assignmentSet = kafkaConsumer.assignment();
    if (assignmentSet.isEmpty()) {
      return;
    }
    if (logger.isDebugEnabled()) {
      logger.debug("Pausing " + assignmentSet);
    }
    assignments = assignmentSet.toArray(new TopicPartition[assignmentSet.size()]);
    kafkaConsumer.pause(assignments);
    kafkaConsumer.poll(0);
    if (logger.isDebugEnabled()) {
      logger.debug("Resuming " + assignments);
    }
  } finally {
    try {
      if (assignments != null) {
        kafkaConsumer.resume(assignments);
      }
    } finally {
      pollingLock.unlock();
    }
  }
}

相关文章