如何在scala中实现java接口(对于kafka)?

nzkunb0c  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(473)

如何实施 ConsumerRebalanceListener 使用scala?

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
}

在订阅主题时,新制作的scala重新平衡监听器的例子是什么?
在scala中实现java方法/接口的过程中尝试学习和总结。。
谢谢。

tcbh2hod

tcbh2hod1#

您可以直接扩展接口

class MyListener extends ConsumerRebalanceListener {
     ...
 }

api文档中的示例如下所示:

class SaveOffsetsOnRebalance(consumer: Consumer[_, _] ) extends ConsumerRebalanceListener {

   def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
       // save the offsets in an external store using some custom code not described
   partitions.toScala.forEach(
         saveOffsetInExternalStore(consumer.position(partition))
   )
   }

   def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = {
       // read the offsets from an external store using some custom code not described here
       partitions.forEach(
          consumer.seek(partition, readOffsetFromExternalStore(partition)))
   }
 }

只需添加适当的导入

nwlqm0z1

nwlqm0z12#

scala中有一些特性与java中的接口相对应。scala trait在内部转换为java接口。就像我们在java中实现接口一样,我们在scala中扩展traits的方式也是如此。所以您只需要扩展java接口,就好像它是scala特性一样,因为在引擎盖下两者是相同的。

class SaveOffsetsOnRebalance extends ConsumerRebalanceListener {}

相关问题