org.apache.kafka.clients.consumer.Consumer.seek()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(338)

本文整理了Java中org.apache.kafka.clients.consumer.Consumer.seek()方法的一些代码示例,展示了Consumer.seek()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.seek()方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.Consumer
类名称:Consumer
方法名:seek

Consumer.seek介绍

[英]Overrides the fetch positions that the consumer will use on the next fetch request. If the consumer subscribes to a list of topics using #subscribe(String...), an exception will be thrown if the specified topic partition is not owned by the consumer.
[中]

代码示例

代码示例来源:origin: openzipkin/brave

@Override public void seek(TopicPartition partition, long offset) {
 delegate.seek(partition, offset);
}

代码示例来源:origin: apache/storm

} else if (lastBatchMeta != null) {
    LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
    consumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
  } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
    LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp);
  consumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
  LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
} else {
  consumer.seek(tp, initialFetchOffset);
  LOG.debug("First poll for topic partition [{}], no last batch metadata present."
    + " Using stored initial fetch offset [{}]", tp, initialFetchOffset);

代码示例来源:origin: apache/incubator-gobblin

@Override
public void start(WatermarkStorage watermarkStorage)
  throws IOException {
 Preconditions.checkArgument(watermarkStorage != null, "Watermark Storage should not be null");
 Map<String, CheckpointableWatermark> watermarkMap =
   watermarkStorage.getCommittedWatermarks(KafkaWatermark.class, Collections.singletonList(_partition.toString()));
 KafkaWatermark watermark = (KafkaWatermark) watermarkMap.get(_partition.toString());
 if (watermark == null) {
  LOG.info("Offset is null - seeking to beginning of topic and partition for {} ", _partition.toString());
  _consumer.seekToBeginning(_partition);
 } else {
  // seek needs to go one past the last committed offset
  LOG.info("Offset found in consumer for partition {}. Seeking to one past what we found : {}",
    _partition.toString(), watermark.getLwm().getValue() + 1);
  _consumer.seek(_partition, watermark.getLwm().getValue() + 1);
 }
 _isStarted.set(true);
}

代码示例来源:origin: apache/storm

private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEarliestRetriableOffsets) {
  for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) {
    //Seek directly to the earliest retriable message for each retriable topic partition
    consumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
  }
}

代码示例来源:origin: apache/hive

consumer.seek(topicPartition, requestedStartOffset);
this.startOffset = consumer.position(topicPartition);
if (this.startOffset != requestedStartOffset) {

代码示例来源:origin: apache/nifi

private void rollback(final TopicPartition topicPartition) {
  OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
  if (offsetAndMetadata == null) {
    offsetAndMetadata = kafkaConsumer.committed(topicPartition);
  }
  final long offset = offsetAndMetadata.offset();
  kafkaConsumer.seek(topicPartition, offset);
}

代码示例来源:origin: apache/nifi

private void rollback(final TopicPartition topicPartition) {
  try {
    OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
    if (offsetAndMetadata == null) {
      offsetAndMetadata = kafkaConsumer.committed(topicPartition);
    }
    final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
    kafkaConsumer.seek(topicPartition, offset);
  } catch (final Exception rollbackException) {
    logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
  }
}

代码示例来源:origin: apache/nifi

private void rollback(final TopicPartition topicPartition) {
  try {
    OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
    if (offsetAndMetadata == null) {
      offsetAndMetadata = kafkaConsumer.committed(topicPartition);
    }
    final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
    kafkaConsumer.seek(topicPartition, offset);
  } catch (final Exception rollbackException) {
    logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
  }
}

代码示例来源:origin: apache/nifi

private void rollback(final TopicPartition topicPartition) {
  try {
    OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
    if (offsetAndMetadata == null) {
      offsetAndMetadata = kafkaConsumer.committed(topicPartition);
    }
    final long offset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
    kafkaConsumer.seek(topicPartition, offset);
  } catch (final Exception rollbackException) {
    logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
  }
}

代码示例来源:origin: apache/storm

consumer.seek(tp, committedOffset);

代码示例来源:origin: apache/kylin

log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark);
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.seek(topicPartition, watermark);
messages = consumer.poll(timeOut);
iterator = messages.iterator();

代码示例来源:origin: apache/incubator-gobblin

@Override
public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, long nextOffset, long maxOffset) {
 if (nextOffset > maxOffset) {
  return null;
 }
 this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
 this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
 ConsumerRecords<K, V> consumerRecords = consumer.poll(super.fetchTimeoutMillis);
 return Iterators.transform(consumerRecords.iterator(), new Function<ConsumerRecord<K, V>, KafkaConsumerRecord>() {
  @Override
  public KafkaConsumerRecord apply(ConsumerRecord<K, V> input) {
   return new Kafka09ConsumerRecord<>(input);
  }
 });
}

代码示例来源:origin: linkedin/cruise-control

_metricConsumer.seek(tp, endOffsets.get(tp));
OffsetAndTimestamp offsetAndTimestamp = entry.getValue();
if (offsetAndTimestamp != null) {
 _metricConsumer.seek(tp, offsetAndTimestamp.offset());
} else {
 _metricConsumer.seek(tp, endOffsets.get(tp));

代码示例来源:origin: apache/storm

consumer.seek(currBatchTp, seekOffset);

代码示例来源:origin: org.apache.kafka/kafka_2.12

private void resetToDatetime(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long timestamp) {
  final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    topicPartitionsAndTimes.put(topicPartition, timestamp);
  }
  final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
  }
}

代码示例来源:origin: org.apache.kafka/kafka

private void resetToDatetime(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long timestamp) {
  final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    topicPartitionsAndTimes.put(topicPartition, timestamp);
  }
  final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
  }
}

代码示例来源:origin: org.apache.kafka/kafka

public void resetOffsetsTo(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long offset) {
  final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    topicPartitionsAndOffset.put(topicPartition, offset);
  }
  final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
    checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
  }
}

代码示例来源:origin: org.apache.kafka/kafka_2.11

public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
  final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
    checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
  }
}

代码示例来源:origin: org.apache.kafka/kafka_2.12

public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
  final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
  final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
    checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
  for (final TopicPartition topicPartition : inputTopicPartitions) {
    client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
  }
}

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
  return (m, e, c) -> {
    this.listen3Exception = e;
    MessageHeaders headers = m.getHeaders();
    c.seek(new org.apache.kafka.common.TopicPartition(
            headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
            headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
        headers.get(KafkaHeaders.OFFSET, Long.class));
    return null;
  };
}

相关文章