本文整理了Java中org.apache.kafka.clients.consumer.Consumer
类的一些代码示例,展示了Consumer
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer
类的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
暂无
代码示例来源:origin: apache/storm
private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
try {
consumer.pause(pausedPartitions);
final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka",
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(consumer.assignment());
consumer.commitSync(offsetsToCommit);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
return consumerRecords;
} finally {
consumer.resume(pausedPartitions);
}
}
代码示例来源:origin: apache/nifi
private void closeConsumer(final Consumer<?, ?> consumer) {
consumerClosedCountRef.incrementAndGet();
try {
consumer.unsubscribe();
} catch (Exception e) {
logger.warn("Failed while unsubscribing " + consumer, e);
}
try {
consumer.close();
} catch (Exception e) {
logger.warn("Failed while closing " + consumer, e);
}
}
代码示例来源:origin: apache/nifi
private void rollback(final TopicPartition topicPartition) {
OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
if (offsetAndMetadata == null) {
offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
final long offset = offsetAndMetadata.offset();
kafkaConsumer.seek(topicPartition, offset);
}
代码示例来源:origin: confluentinc/ksql
public void close() {
commandConsumer.wakeup();
commandConsumer.close();
commandProducer.close();
}
}
代码示例来源:origin: linkedin/cruise-control
/**
* The check if the consumption is done or not. The consumption is done if the consumer has caught up with the
* log end or all the partitions are paused.
* @param endOffsets the log end for each partition.
* @return true if the consumption is done, false otherwise.
*/
private boolean consumptionDone(Map<TopicPartition, Long> endOffsets) {
Set<TopicPartition> partitionsNotPaused = new HashSet<>(_metricConsumer.assignment());
partitionsNotPaused.removeAll(_metricConsumer.paused());
for (TopicPartition tp : partitionsNotPaused) {
if (_metricConsumer.position(tp) < endOffsets.get(tp)) {
return false;
}
}
return true;
}
代码示例来源:origin: org.apache.kafka/kafka_2.12
resetOffsetsTo(client, inputTopicPartitions, options.valueOf(toOffsetOption));
} else if (options.has(toEarliestOption)) {
client.seekToBeginning(inputTopicPartitions);
} else if (options.has(toLatestOption)) {
client.seekToEnd(inputTopicPartitions);
} else if (options.has(shiftByOption)) {
shiftOffsetsBy(client, inputTopicPartitions, options.valueOf(shiftByOption));
resetOffsetsFromResetPlan(client, inputTopicPartitions, topicPartitionsAndOffset);
} else {
client.seekToBeginning(inputTopicPartitions);
System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));
代码示例来源:origin: apache/metron
@Override
public String getSampleMessage(final String topic) {
String message = null;
if (listTopics().contains(topic)) {
try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) {
kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
.map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
kafkaConsumer.assignment().stream()
.filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
.forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
message = records.isEmpty() ? null : records.iterator().next().value();
kafkaConsumer.unsubscribe();
}
}
return message;
}
代码示例来源:origin: confluentinc/ksql
CommandTopic(
final String commandTopicName,
final Consumer<CommandId, Command> commandConsumer,
final Producer<CommandId, Command> commandProducer
) {
this.commandTopicPartition = new TopicPartition(commandTopicName, 0);
this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer");
this.commandProducer = Objects.requireNonNull(commandProducer, "commandProducer");
this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName");
commandConsumer.assign(Collections.singleton(commandTopicPartition));
}
代码示例来源:origin: apache/incubator-gobblin
@Override
public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
if (nextOffset > maxOffset) {
return null;
}
this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
return Iterators.transform(consumerRecords.iterator(), new Function<ConsumerRecord<K, V>, KafkaConsumerRecord>() {
@Override
public KafkaConsumerRecord apply(ConsumerRecord<K, V> input) {
return new Kafka09ConsumerRecord<>(input);
}
});
}
代码示例来源:origin: spring-projects/spring-kafka
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
final CountDownLatch latch = new CountDownLatch(1);
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
latch.countDown();
Thread.sleep(50);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
ArgumentCaptor<Collection<TopicPartition>> captor = ArgumentCaptor.forClass(List.class);
verify(consumer).seekToBeginning(captor.capture());
assertThat(captor.getValue())
.isEqualTo(new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 4))));
verify(consumer).seekToEnd(captor.capture());
assertThat(captor.getValue())
.isEqualTo(new HashSet<>(Arrays.asList(new TopicPartition("foo", 1), new TopicPartition("foo", 5))));
verify(consumer).seek(new TopicPartition("foo", 2), 0L);
verify(consumer).seek(new TopicPartition("foo", 3), Long.MAX_VALUE);
container.stop();
代码示例来源:origin: apache/incubator-gobblin
@Override
public long getEarliestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
this.consumer.assign(Collections.singletonList(topicPartition));
this.consumer.seekToBeginning(topicPartition);
return this.consumer.position(topicPartition);
}
代码示例来源:origin: apache/incubator-gobblin
@Override
public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
this.consumer.assign(Collections.singletonList(topicPartition));
this.consumer.seekToEnd(topicPartition);
return this.consumer.position(topicPartition);
}
代码示例来源:origin: apache/kylin
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.seek(topicPartition, watermark);
messages = consumer.poll(timeOut);
iterator = messages.iterator();
if (!iterator.hasNext()) {
代码示例来源:origin: spring-projects/spring-kafka
@SuppressWarnings("unchecked")
@Test
public void stopContainerAfterException() throws Exception {
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.errorLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
MessageListenerContainer container = this.registry.getListenerContainer(CONTAINER_ID);
assertThat(container.isRunning()).isFalse();
InOrder inOrder = inOrder(this.consumer);
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(this.consumer).wakeup();
inOrder.verify(this.consumer).unsubscribe();
inOrder.verify(this.consumer).close();
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldGetRestoreCommandsCorrectly() {
// Given:
when(commandConsumer.poll(any(Duration.class)))
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
new ConsumerRecord<>("topic", 0, 0, commandId2, command2)))
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 0, commandId3, command3)))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
// When:
final List<QueuedCommand> queuedCommandList = commandTopic
.getRestoreCommands(Duration.ofMillis(1));
// Then:
verify(commandConsumer).seekToBeginning(topicPartitionsCaptor.capture());
assertThat(topicPartitionsCaptor.getValue(),
equalTo(Collections.singletonList(new TopicPartition(COMMAND_TOPIC_NAME, 0))));
assertThat(queuedCommandList, equalTo(ImmutableList.of(
new QueuedCommand(commandId1, command1, Optional.empty()),
new QueuedCommand(commandId2, command2, Optional.empty()),
new QueuedCommand(commandId3, command3, Optional.empty()))));
}
代码示例来源:origin: spring-projects/spring-kafka
consumer.assign(Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 0)));
if (consumer.position(new TopicPartition(topic, 0)) == 1) {
break;
assertThat(consumer.position(new TopicPartition(topic, 0))).isEqualTo(1);
consumer.close();
logger.info("Stop ack on error with ManualImmediate ack mode");
代码示例来源:origin: apache/storm
@Override
public Set<TopicPartition> getAllSubscribedPartitions(Consumer<?, ?> consumer) {
Set<TopicPartition> allPartitions = new HashSet<>();
for (String topic : topics) {
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
if (partitionInfoList != null) {
for (PartitionInfo partitionInfo : partitionInfoList) {
allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
} else {
LOG.warn("Topic {} not found, skipping addition of the topic", topic);
}
}
return allPartitions;
}
代码示例来源:origin: apache/storm
if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) {
LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
consumer.seekToBeginning(Collections.singleton(tp));
} else if (firstPollOffsetStrategy == LATEST && isFirstPollSinceTopologyWasDeployed) {
LOG.debug("First poll for topic partition [{}], seeking to partition end", tp);
consumer.seekToEnd(Collections.singleton(tp));
} else if (lastBatchMeta != null) {
LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp);
consumer.seekToBeginning(Collections.singleton(tp));
} else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp);
consumer.seekToEnd(Collections.singleton(tp));
tpToFirstSeekOffset.put(tp, consumer.position(tp));
} else if (lastBatchMeta != null) {
consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
} else {
consumer.seek(tp, initialFetchOffset);
LOG.debug("First poll for topic partition [{}], no last batch metadata present."
+ " Using stored initial fetch offset [{}]", tp, initialFetchOffset);
final long fetchOffset = consumer.position(tp);
LOG.debug("Set [fetchOffset = {}] for partition [{}]", fetchOffset, tp);
return fetchOffset;
代码示例来源:origin: openzipkin/brave
ConsumerRecords<K, V> records = delegate.poll(timeout);
if (records.isEmpty() || tracing.isNoop()) return records;
long timestamp = 0L;
Map<String, Span> consumerSpansForTopic = new LinkedHashMap<>();
for (TopicPartition partition : records.partitions()) {
String topic = partition.topic();
List<ConsumerRecord<K, V>> recordsInPartition = records.records(partition);
for (int i = 0, length = recordsInPartition.size(); i < length; i++) {
代码示例来源:origin: confluentinc/ksql
public Iterable<ConsumerRecord<CommandId, Command>> getNewCommands(final Duration timeout) {
return commandConsumer.poll(timeout);
}
内容来源于网络,如有侵权,请联系作者删除!