本文整理了Java中org.apache.kafka.clients.consumer.Consumer.position()
方法的一些代码示例,展示了Consumer.position()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.position()
方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:position
[英]Returns the fetch position of the next message for the specified topic partition to be used on the next #poll(long)
[中]返回指定主题分区的下一条消息的获取位置,该分区将在下一次轮询(长)中使用
代码示例来源:origin: confluentinc/ksql
public long getCommandTopicConsumerPosition() {
return commandConsumer.position(commandTopicPartition);
}
代码示例来源:origin: openzipkin/brave
@Override public long position(TopicPartition partition) {
return delegate.position(partition);
}
代码示例来源:origin: openzipkin/brave
public long position(TopicPartition partition, Duration timeout) {
return delegate.position(partition, timeout);
}
代码示例来源:origin: apache/storm
private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition tp : assignedPartitions) {
offsetsToCommit.put(tp, new OffsetAndMetadata(consumer.position(tp), commitMetadataManager.getCommitMetadata()));
}
return offsetsToCommit;
}
代码示例来源:origin: linkedin/cruise-control
/**
* The check if the consumption is done or not. The consumption is done if the consumer has caught up with the
* log end or all the partitions are paused.
* @param endOffsets the log end for each partition.
* @return true if the consumption is done, false otherwise.
*/
private boolean consumptionDone(Map<TopicPartition, Long> endOffsets) {
Set<TopicPartition> partitionsNotPaused = new HashSet<>(_metricConsumer.assignment());
partitionsNotPaused.removeAll(_metricConsumer.paused());
for (TopicPartition tp : partitionsNotPaused) {
if (_metricConsumer.position(tp) < endOffsets.get(tp)) {
return false;
}
}
return true;
}
代码示例来源:origin: apache/hive
/**
* Poll more records from the Kafka Broker.
*
* @throws PollTimeoutException if poll returns 0 record and consumer's position < requested endOffset.
*/
private void pollRecords() {
if (LOG.isTraceEnabled()) {
stopwatch.reset().start();
}
records = consumer.poll(pollTimeoutDurationMs);
if (LOG.isTraceEnabled()) {
stopwatch.stop();
LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
// Fail if we can not poll within one lap of pollTimeoutMs.
if (records.isEmpty() && consumer.position(topicPartition) < endOffset) {
throw new PollTimeoutException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
pollTimeoutMs,
topicPartition.toString(),
startOffset,
consumer.position(topicPartition),
endOffset));
}
consumerRecordIterator = records.iterator();
consumerPosition = consumer.position(topicPartition);
}
代码示例来源:origin: apache/hive
this.endOffset = consumer.position(topicPartition);
LOG.info("End Offset set to [{}]", this.endOffset);
} else {
LOG.info("Seeking to offset [{}] of topic partition [{}]", requestedStartOffset, topicPartition);
consumer.seek(topicPartition, requestedStartOffset);
this.startOffset = consumer.position(topicPartition);
if (this.startOffset != requestedStartOffset) {
LOG.warn("Current Start Offset [{}] is different form the requested start position [{}]",
this.startOffset = consumer.position(topicPartition);
LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]",
topicPartition,
consumerPosition = consumer.position(topicPartition);
Preconditions.checkState(this.endOffset >= consumerPosition,
"End offset [%s] need to be greater or equal than start offset [%s]",
代码示例来源:origin: apache/incubator-gobblin
@Override
public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
this.consumer.assign(Collections.singletonList(topicPartition));
this.consumer.seekToEnd(topicPartition);
return this.consumer.position(topicPartition);
}
代码示例来源:origin: apache/storm
long position = consumer.position(tp);
long committedOffset = tpOffset.getValue().offset();
if (position < committedOffset) {
代码示例来源:origin: apache/incubator-gobblin
@Override
public long getEarliestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
this.consumer.assign(Collections.singletonList(topicPartition));
this.consumer.seekToBeginning(topicPartition);
return this.consumer.position(topicPartition);
}
代码示例来源:origin: apache/storm
consumer.seekToEnd(Collections.singleton(tp));
tpToFirstSeekOffset.put(tp, consumer.position(tp));
} else if (lastBatchMeta != null) {
consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
final long fetchOffset = consumer.position(tp);
LOG.debug("Set [fetchOffset = {}] for partition [{}]", fetchOffset, tp);
return fetchOffset;
代码示例来源:origin: apache/storm
long lastEmittedOffset = consumer.position(currBatchTp) - 1;
currentBatch = new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset, topologyContext.getStormId());
代码示例来源:origin: org.apache.kafka/kafka_2.12
public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, long shiftBy) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
final long position = client.position(topicPartition);
final long offset = position + shiftBy;
topicPartitionsAndOffset.put(topicPartition, offset);
}
final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
for (final TopicPartition topicPartition : inputTopicPartitions) {
client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
}
}
代码示例来源:origin: org.apache.kafka/kafka_2.11
public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, long shiftBy) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
final long position = client.position(topicPartition);
final long offset = position + shiftBy;
topicPartitionsAndOffset.put(topicPartition, offset);
}
final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
for (final TopicPartition topicPartition : inputTopicPartitions) {
client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
}
}
代码示例来源:origin: org.apache.kafka/kafka
public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, long shiftBy) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
final long position = client.position(topicPartition);
final long offset = position + shiftBy;
topicPartitionsAndOffset.put(topicPartition, offset);
}
final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
for (final TopicPartition topicPartition : inputTopicPartitions) {
client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
}
}
代码示例来源:origin: apache/metron
@Override
public String getSampleMessage(final String topic) {
String message = null;
if (listTopics().contains(topic)) {
try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) {
kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
.map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
kafkaConsumer.assignment().stream()
.filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
.forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
message = records.isEmpty() ? null : records.iterator().next().value();
kafkaConsumer.unsubscribe();
}
}
return message;
}
代码示例来源:origin: org.apache.kafka/kafka_2.11
System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));
代码示例来源:origin: org.apache.kafka/kafka
System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));
代码示例来源:origin: spring-projects/spring-kafka
Consumer<Integer, String> consumer = cf.createConsumer();
consumer.assign(Arrays.asList(new TopicPartition(topic9, 0), new TopicPartition(topic9, 1)));
assertThat(consumer.position(new TopicPartition(topic9, 0))).isEqualTo(2);
assertThat(consumer.position(new TopicPartition(topic9, 1))).isEqualTo(2);
container.stop();
consumer.close();
代码示例来源:origin: org.apache.kafka/kafka_2.12
System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));
内容来源于网络,如有侵权,请联系作者删除!