public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {
关于:
**
* A configuration container to represent a topic name, partition number and, optionally,
* an initial offset for it. The initial offset can be:
* <ul>
* <li>{@code null} - do nothing;</li>
* <li>positive (including {@code 0}) - seek to EITHER the absolute offset within the
* partition or an offset relative to the current position for this consumer, depending
* on {@link #isRelativeToCurrent()}.
* </li>
* <li>negative - seek to EITHER the offset relative to the current last offset within
* the partition: {@code consumer.seekToEnd() + initialOffset} OR the relative to the
* current offset for this consumer (if any), depending on
* {@link #isRelativeToCurrent()}.</li>
* </ul>
* Offsets are applied when the container is {@code start()}ed.
*
* @author Artem Bilan
* @author Gary Russell
*/
public class TopicPartitionInitialOffset {
/**
* Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
* and will replace the previous assignment (if there is one).
* <p>
* If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}.
* <p>
* Manual topic assignment through this method does not use the consumer's group management
* functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
* metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
* and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
* <p>
* If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
* assignment replaces the old one.
*
* @param partitions The list of partitions to assign this consumer
* @throws IllegalArgumentException If partitions is null or contains null or empty topics
* @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern
* (without a subsequent call to {@link #unsubscribe()})
*/
@Override
public void assign(Collection<TopicPartition> partitions) {
1条答案
按热度按时间xghobddn1#
这个
ContainerProperties
用于配置KafkaMessageListenerContainer
具有以下特性:关于:
因此,通过这种方式,您可以指定从哪个主题开始使用哪个分区和哪个偏移量。
当我们使用基于分区的配置时,我们在
KafkaConsumer
:因此,这是直接从特定分区使用的标准实践。