org.apache.kafka.clients.consumer.Consumer类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(12.6k)|赞(0)|评价(0)|浏览(430)

本文整理了Java中org.apache.kafka.clients.consumer.Consumer类的一些代码示例,展示了Consumer类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer类的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer

Consumer介绍

暂无

代码示例

代码示例来源:origin: apache/storm

private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
  doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
  Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
  pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
  try {
    consumer.pause(pausedPartitions);
    final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
    final int numPolledRecords = consumerRecords.count();
    LOG.debug("Polled [{}] records from Kafka",
      numPolledRecords);
    if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
      //Commit polled records immediately to ensure delivery is at-most-once.
      Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
        createFetchedOffsetsMetadata(consumer.assignment());
      consumer.commitSync(offsetsToCommit);
      LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
    }
    return consumerRecords;
  } finally {
    consumer.resume(pausedPartitions);
  }
}

代码示例来源:origin: apache/nifi

private void closeConsumer(final Consumer<?, ?> consumer) {
  consumerClosedCountRef.incrementAndGet();
  try {
    consumer.unsubscribe();
  } catch (Exception e) {
    logger.warn("Failed while unsubscribing " + consumer, e);
  }
  try {
    consumer.close();
  } catch (Exception e) {
    logger.warn("Failed while closing " + consumer, e);
  }
}

代码示例来源:origin: apache/nifi

private void rollback(final TopicPartition topicPartition) {
  OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
  if (offsetAndMetadata == null) {
    offsetAndMetadata = kafkaConsumer.committed(topicPartition);
  }
  final long offset = offsetAndMetadata.offset();
  kafkaConsumer.seek(topicPartition, offset);
}

代码示例来源:origin: confluentinc/ksql

public void close() {
  commandConsumer.wakeup();
  commandConsumer.close();
  commandProducer.close();
 }
}

代码示例来源:origin: linkedin/cruise-control

/**
 * The check if the consumption is done or not. The consumption is done if the consumer has caught up with the
 * log end or all the partitions are paused.
 * @param endOffsets the log end for each partition.
 * @return true if the consumption is done, false otherwise.
 */
private boolean consumptionDone(Map<TopicPartition, Long> endOffsets) {
 Set<TopicPartition> partitionsNotPaused = new HashSet<>(_metricConsumer.assignment());
 partitionsNotPaused.removeAll(_metricConsumer.paused());
 for (TopicPartition tp : partitionsNotPaused) {
  if (_metricConsumer.position(tp) < endOffsets.get(tp)) {
   return false;
  }
 }
 return true;
}

代码示例来源:origin: org.apache.kafka/kafka_2.12

resetOffsetsTo(client, inputTopicPartitions, options.valueOf(toOffsetOption));
} else if (options.has(toEarliestOption)) {
  client.seekToBeginning(inputTopicPartitions);
} else if (options.has(toLatestOption)) {
  client.seekToEnd(inputTopicPartitions);
} else if (options.has(shiftByOption)) {
  shiftOffsetsBy(client, inputTopicPartitions, options.valueOf(shiftByOption));
  resetOffsetsFromResetPlan(client, inputTopicPartitions, topicPartitionsAndOffset);
} else {
  client.seekToBeginning(inputTopicPartitions);
  System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));

代码示例来源:origin: apache/metron

@Override
public String getSampleMessage(final String topic) {
 String message = null;
 if (listTopics().contains(topic)) {
  try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) {
   kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
    .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
    .collect(Collectors.toList()));
   kafkaConsumer.assignment().stream()
    .filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
    .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
   final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
   message = records.isEmpty() ? null : records.iterator().next().value();
   kafkaConsumer.unsubscribe();
  }
 }
 return message;
}

代码示例来源:origin: confluentinc/ksql

CommandTopic(
  final String commandTopicName,
  final Consumer<CommandId, Command> commandConsumer,
  final Producer<CommandId, Command> commandProducer
) {
 this.commandTopicPartition = new TopicPartition(commandTopicName, 0);
 this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer");
 this.commandProducer = Objects.requireNonNull(commandProducer, "commandProducer");
 this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName");
 commandConsumer.assign(Collections.singleton(commandTopicPartition));
}

代码示例来源:origin: apache/incubator-gobblin

@Override
public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
 if (nextOffset > maxOffset) {
  return null;
 }
 this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
 this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
 ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
 return Iterators.transform(consumerRecords.iterator(), new Function<ConsumerRecord<K, V>, KafkaConsumerRecord>() {
  @Override
  public KafkaConsumerRecord apply(ConsumerRecord<K, V> input) {
   return new Kafka09ConsumerRecord<>(input);
  }
 });
}

代码示例来源:origin: spring-projects/spring-kafka

ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
final CountDownLatch latch = new CountDownLatch(1);
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
  latch.countDown();
  Thread.sleep(50);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
ArgumentCaptor<Collection<TopicPartition>> captor = ArgumentCaptor.forClass(List.class);
verify(consumer).seekToBeginning(captor.capture());
assertThat(captor.getValue())
    .isEqualTo(new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 4))));
verify(consumer).seekToEnd(captor.capture());
assertThat(captor.getValue())
    .isEqualTo(new HashSet<>(Arrays.asList(new TopicPartition("foo", 1), new TopicPartition("foo", 5))));
verify(consumer).seek(new TopicPartition("foo", 2), 0L);
verify(consumer).seek(new TopicPartition("foo", 3), Long.MAX_VALUE);
container.stop();

代码示例来源:origin: apache/incubator-gobblin

@Override
public long getEarliestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
 TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
 this.consumer.assign(Collections.singletonList(topicPartition));
 this.consumer.seekToBeginning(topicPartition);
 return this.consumer.position(topicPartition);
}

代码示例来源:origin: apache/incubator-gobblin

@Override
public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
 TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
 this.consumer.assign(Collections.singletonList(topicPartition));
 this.consumer.seekToEnd(topicPartition);
 return this.consumer.position(topicPartition);
}

代码示例来源:origin: apache/kylin

TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.seek(topicPartition, watermark);
messages = consumer.poll(timeOut);
iterator = messages.iterator();
if (!iterator.hasNext()) {

代码示例来源:origin: spring-projects/spring-kafka

@SuppressWarnings("unchecked")
@Test
public void stopContainerAfterException() throws Exception {
  assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
  assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
  assertThat(this.config.errorLatch.await(10, TimeUnit.SECONDS)).isTrue();
  assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
  MessageListenerContainer container = this.registry.getListenerContainer(CONTAINER_ID);
  assertThat(container.isRunning()).isFalse();
  InOrder inOrder = inOrder(this.consumer);
  inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
  inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
  inOrder.verify(this.consumer).wakeup();
  inOrder.verify(this.consumer).unsubscribe();
  inOrder.verify(this.consumer).close();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldGetRestoreCommandsCorrectly() {
 // Given:
 when(commandConsumer.poll(any(Duration.class)))
   .thenReturn(someConsumerRecords(
     new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
     new ConsumerRecord<>("topic", 0, 0, commandId2, command2)))
   .thenReturn(someConsumerRecords(
     new ConsumerRecord<>("topic", 0, 0, commandId3, command3)))
   .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
 // When:
 final List<QueuedCommand> queuedCommandList = commandTopic
   .getRestoreCommands(Duration.ofMillis(1));
 // Then:
 verify(commandConsumer).seekToBeginning(topicPartitionsCaptor.capture());
 assertThat(topicPartitionsCaptor.getValue(),
   equalTo(Collections.singletonList(new TopicPartition(COMMAND_TOPIC_NAME, 0))));
 assertThat(queuedCommandList, equalTo(ImmutableList.of(
   new QueuedCommand(commandId1, command1, Optional.empty()),
   new QueuedCommand(commandId2, command2, Optional.empty()),
   new QueuedCommand(commandId3, command3, Optional.empty()))));
}

代码示例来源:origin: spring-projects/spring-kafka

consumer.assign(Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 0)));
  if (consumer.position(new TopicPartition(topic, 0)) == 1) {
    break;
assertThat(consumer.position(new TopicPartition(topic, 0))).isEqualTo(1);
consumer.close();
logger.info("Stop ack on error with ManualImmediate ack mode");

代码示例来源:origin: apache/storm

@Override
public Set<TopicPartition> getAllSubscribedPartitions(Consumer<?, ?> consumer) {
  Set<TopicPartition> allPartitions = new HashSet<>();
  for (String topic : topics) {
    List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
    if (partitionInfoList != null) {
      for (PartitionInfo partitionInfo : partitionInfoList) {
        allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
      }
    } else {
      LOG.warn("Topic {} not found, skipping addition of the topic", topic);
    }
  }
  return allPartitions;
}

代码示例来源:origin: apache/storm

if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) {
    LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
    consumer.seekToBeginning(Collections.singleton(tp));
  } else if (firstPollOffsetStrategy == LATEST && isFirstPollSinceTopologyWasDeployed) {
    LOG.debug("First poll for topic partition [{}], seeking to partition end", tp);
    consumer.seekToEnd(Collections.singleton(tp));
  } else if (lastBatchMeta != null) {
    LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
    consumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
  } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
    LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp);
    consumer.seekToBeginning(Collections.singleton(tp));
  } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
    LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp);
    consumer.seekToEnd(Collections.singleton(tp));
  tpToFirstSeekOffset.put(tp, consumer.position(tp));
} else if (lastBatchMeta != null) {
  consumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
  LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
} else {
  consumer.seek(tp, initialFetchOffset);
  LOG.debug("First poll for topic partition [{}], no last batch metadata present."
    + " Using stored initial fetch offset [{}]", tp, initialFetchOffset);
final long fetchOffset = consumer.position(tp);
LOG.debug("Set [fetchOffset = {}] for partition [{}]", fetchOffset, tp);
return fetchOffset;

代码示例来源:origin: openzipkin/brave

ConsumerRecords<K, V> records = delegate.poll(timeout);
if (records.isEmpty() || tracing.isNoop()) return records;
long timestamp = 0L;
Map<String, Span> consumerSpansForTopic = new LinkedHashMap<>();
for (TopicPartition partition : records.partitions()) {
 String topic = partition.topic();
 List<ConsumerRecord<K, V>> recordsInPartition = records.records(partition);
 for (int i = 0, length = recordsInPartition.size(); i < length; i++) {

代码示例来源:origin: confluentinc/ksql

public Iterable<ConsumerRecord<CommandId, Command>> getNewCommands(final Duration timeout) {
 return commandConsumer.poll(timeout);
}

相关文章