我试图把Kafka和苍鹭的拓扑结构结合起来。但是,我找不到任何使用最新版本heron(0.17.5)的示例。关于如何实现自定义Kafka喷口和Kafka插销,有什么可以分享的例子或建议吗?
编辑1:
我相信kafkaspout和kafkabolt在heron中被故意弃用,让位于新的streamlet api。我现在想看看是否可以使用streamlet api构建kafkasource和kafkasink。然而,当我试图在源代码中创建一个kafkaconsumer时,我得到了下面的异常。
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.consumer.KafkaConsumer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.twitter.heron.api.utils.Utils.serialize(Utils.java:97)
编辑2:
修复了上述问题。我正在初始化 KafkaConsumer
在构造函数中,这是错误的。在中初始化相同的 setup()
方法修复了它。
1条答案
按热度按时间j13ufse21#
我使用streamletapi为heron实现了这一点。我在这里贴同样的。希望它能帮助其他面临同样问题的人。
Kafka来源