consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception e) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
我从定义到Kafka都得到了上面的代码。我不知道怎么把它们变成Kotlin。我试过了,但失败了。
欢迎任何评论。谢谢
更新
我皈依了
object:OffsetCommitCallback() {
override fun onComplete(offsets:Map<TopicPartition, OffsetAndMetadata>, e:Exception) {
if (e != null) log.error("Commit failed for offsets {}", offsets, e)
}
}
但是我得到了 This class does not have a constructor
.
更新
似乎以下工作:
kafkaConsumer.commitAsync(mapOf(k to currentOffsets.get(k)), object:OffsetCommitCallback {
override fun onComplete(offsets:Map<TopicPartition, OffsetAndMetadata>, e:Exception) {
e?.let {
log.error("Commit failed for offsets {}", offsets, e)
}
}
})
2条答案
按热度按时间jjjwad0x1#
看起来像
OffsetCommitCallback
是函数接口,因此您应该能够在函数调用中使用lambda(sam转换):ztigrdn82#
看起来像
OffsetCommitCallback
是一个接口,而不是一个类,因此创建它的匿名示例不会使用括号:而不是
还要注意,假设原始代码正在检查
e
是null
,您可能需要在kotlin方法签名中使用可为null的类型: