我正在使用Kafka接收器任务读取Kafka主题的记录。sinktask方法中的put()是从中获取所有记录的入口点。当前,当连接器启动时,它将把所有未提交的记录收集在一起。我希望worker任务一次获取一条记录。怎么做?
class CustomSinkTask extends SinkTask{
@Override
public void put(Collection<SinkRecord> records) {
System.out.println("Inside put method " );
if(records != null)
System.out.println("number of records fetched are:" + records.size());
}
}
2条答案
按热度按时间ax6ht2ek1#
您可以通过在kafka connect属性文件中将max poll records设置为所需的数字来实现这一点。确保在max.poll.records属性前面加上
consumer
. 要了解更多有关工作程序属性的信息,请参阅此页。consumer.max.poll.records=n
hgtggwj02#
您可以尝试将以下内容添加到worker属性文件