我已经创建了一个接收器Kafka连接转换数据到其他存储;我想把 auto.offset.reset
作为 latest
使用创建新连接器时 kafka connect rest api
; 我已经准备好了 consumer.auto.offset.reset: latest
在配置中; json { "name": "test_v14", "config": { "name": "test_v14", "consumer.auto.offset.reset": "latest", "connector.class": "...", ... } }
但当任务开始时,Kafka的消费者还是从最早的时候开始投票记录;还有其他的方法吗 auto.offset.reset
最新版本;
2条答案
按热度按时间vhipe2zx1#
Kafka2.3之前
consumer.auto.offset.reset
需要在connect-distributed*.properties
文件(工人)。它不能应用于任何特定的连接器,除非连接器类显式地创建并加载它自己的使用者对象,这些使用者对象在该属性中读取。
jaql4c8m2#
从ApacheKafka2.3开始,现在可以将其设置为连接器配置的一部分。
在工作集上:
然后在连接器中可以指定
有关详细信息,请参见:https://rmoff.net/2019/08/09/starting-a-kafka-connect-sink-connector-at-the-end-of-a-topic/