本文整理了Java中org.apache.kafka.clients.consumer.Consumer.seek()
方法的一些代码示例,展示了Consumer.seek()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.seek()
方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:seek
[英]Overrides the fetch positions that the consumer will use on the next fetch request. If the consumer subscribes to a list of topics using #subscribe(String...), an exception will be thrown if the specified topic partition is not owned by the consumer.
[中]
代码示例来源:origin: openzipkin/brave
@Override public void seek(TopicPartition partition, long offset) {
delegate.seek(partition, offset);
}
代码示例来源:origin: apache/storm
} else if (lastBatchMeta != null) {
LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp);
consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
} else {
consumer.seek(tp, initialFetchOffset);
LOG.debug("First poll for topic partition [{}], no last batch metadata present."
+ " Using stored initial fetch offset [{}]", tp, initialFetchOffset);
代码示例来源:origin: apache/incubator-gobblin
@Override
public void start(WatermarkStorage watermarkStorage)
throws IOException {
Preconditions.checkArgument(watermarkStorage != null, "Watermark Storage should not be null");
Map<String, CheckpointableWatermark> watermarkMap =
watermarkStorage.getCommittedWatermarks(KafkaWatermark.class, Collections.singletonList(_partition.toString()));
KafkaWatermark watermark = (KafkaWatermark) watermarkMap.get(_partition.toString());
if (watermark == null) {
LOG.info("Offset is null - seeking to beginning of topic and partition for {} ", _partition.toString());
_consumer.seekToBeginning(_partition);
} else {
// seek needs to go one past the last committed offset
LOG.info("Offset found in consumer for partition {}. Seeking to one past what we found : {}",
_partition.toString(), watermark.getLwm().getValue() + 1);
_consumer.seek(_partition, watermark.getLwm().getValue() + 1);
}
_isStarted.set(true);
}
代码示例来源:origin: apache/storm
private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEarliestRetriableOffsets) {
for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) {
//Seek directly to the earliest retriable message for each retriable topic partition
consumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
}
}
代码示例来源:origin: apache/hive
consumer.seek(topicPartition, requestedStartOffset);
this.startOffset = consumer.position(topicPartition);
if (this.startOffset != requestedStartOffset) {
代码示例来源:origin: apache/nifi
private void rollback(final TopicPartition topicPartition) {
OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
if (offsetAndMetadata == null) {
offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
final long offset = offsetAndMetadata.offset();
kafkaConsumer.seek(topicPartition, offset);
}
代码示例来源:origin: apache/nifi
private void rollback(final TopicPartition topicPartition) {
try {
OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
if (offsetAndMetadata == null) {
offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
kafkaConsumer.seek(topicPartition, offset);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
}
代码示例来源:origin: apache/nifi
private void rollback(final TopicPartition topicPartition) {
try {
OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
if (offsetAndMetadata == null) {
offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
kafkaConsumer.seek(topicPartition, offset);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
}
代码示例来源:origin: apache/nifi
private void rollback(final TopicPartition topicPartition) {
try {
OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
if (offsetAndMetadata == null) {
offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
kafkaConsumer.seek(topicPartition, offset);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
}
代码示例来源:origin: apache/storm
consumer.seek(tp, committedOffset);
代码示例来源:origin: apache/kylin
log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark);
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.seek(topicPartition, watermark);
messages = consumer.poll(timeOut);
iterator = messages.iterator();
代码示例来源:origin: apache/incubator-gobblin
@Override
public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
if (nextOffset > maxOffset) {
return null;
}
this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
return Iterators.transform(consumerRecords.iterator(), new Function<ConsumerRecord<K, V>, KafkaConsumerRecord>() {
@Override
public KafkaConsumerRecord apply(ConsumerRecord<K, V> input) {
return new Kafka09ConsumerRecord<>(input);
}
});
}
代码示例来源:origin: linkedin/cruise-control
_metricConsumer.seek(tp, endOffsets.get(tp));
OffsetAndTimestamp offsetAndTimestamp = entry.getValue();
if (offsetAndTimestamp != null) {
_metricConsumer.seek(tp, offsetAndTimestamp.offset());
} else {
_metricConsumer.seek(tp, endOffsets.get(tp));
代码示例来源:origin: apache/storm
consumer.seek(currBatchTp, seekOffset);
代码示例来源:origin: org.apache.kafka/kafka_2.12
private void resetToDatetime(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long timestamp) {
final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
topicPartitionsAndTimes.put(topicPartition, timestamp);
}
final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
for (final TopicPartition topicPartition : inputTopicPartitions) {
client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
}
}
代码示例来源:origin: org.apache.kafka/kafka
private void resetToDatetime(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long timestamp) {
final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
topicPartitionsAndTimes.put(topicPartition, timestamp);
}
final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
for (final TopicPartition topicPartition : inputTopicPartitions) {
client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
}
}
代码示例来源:origin: org.apache.kafka/kafka
public void resetOffsetsTo(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long offset) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
topicPartitionsAndOffset.put(topicPartition, offset);
}
final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
for (final TopicPartition topicPartition : inputTopicPartitions) {
client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
}
}
代码示例来源:origin: org.apache.kafka/kafka_2.11
public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
for (final TopicPartition topicPartition : inputTopicPartitions) {
client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
}
}
代码示例来源:origin: org.apache.kafka/kafka_2.12
public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
for (final TopicPartition topicPartition : inputTopicPartitions) {
client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
}
}
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
this.listen3Exception = e;
MessageHeaders headers = m.getHeaders();
c.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
内容来源于网络,如有侵权,请联系作者删除!