class MyListener extends ConsumerRebalanceListener {
...
}
api文档中的示例如下所示:
class SaveOffsetsOnRebalance(consumer: Consumer[_, _] ) extends ConsumerRebalanceListener {
def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
// save the offsets in an external store using some custom code not described
partitions.toScala.forEach(
saveOffsetInExternalStore(consumer.position(partition))
)
}
def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = {
// read the offsets from an external store using some custom code not described here
partitions.forEach(
consumer.seek(partition, readOffsetFromExternalStore(partition)))
}
}
2条答案
按热度按时间tcbh2hod1#
您可以直接扩展接口
api文档中的示例如下所示:
只需添加适当的导入
nwlqm0z12#
scala中有一些特性与java中的接口相对应。scala trait在内部转换为java接口。就像我们在java中实现接口一样,我们在scala中扩展traits的方式也是如此。所以您只需要扩展java接口,就好像它是scala特性一样,因为在引擎盖下两者是相同的。