我正在写一个简单的twitter程序,在这里我使用kafka阅读tweet,并希望使用avro进行序列化。到目前为止,我刚刚在scala中设置了twitter配置,现在想使用这个配置来阅读tweet。
如何导入程序中tweets.avsc文件中定义的以下avro模式?
{
"namespace": "tweetavro",
"type": "record",
"name": "Tweet",
"fields": [
{"name": "name", "type": "string"},
{"name": "text", "type": "string"}
]
}
我在网上看了一些例子 import tweetavro.Tweet
在scala中导入模式以便我们可以像
def main (args: Array[String]) {
val twitterStream = TwitterStream.getStream
twitterStream.addListener(new OnTweetPosted(s => sendToKafka(toTweet(s))))
twitterStream.filter(filterUsOnly)
}
private def toTweet(s: Status): Tweet = {
new Tweet(s.getUser.getName, s.getText)
}
private def sendToKafka(t:Tweet) {
println(toJson(t.getSchema).apply(t))
val tweetEnc = toBinary[Tweet].apply(t)
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, tweetEnc)
kafkaProducer.send(msg)
}
我遵循同样的,并使用以下插件中 pom.xml
```
3条答案
按热度按时间lc8prwob1#
你也可以用avro4s。基于模式定义(或生成)case类。我们叫那个班吧
Tweet
. 然后创建一个AvroOutputStream
,它也将从case类推断模式,并用于序列化示例。然后我们可以写入一个字节数组,然后发送给Kafka。如:elcex8rz2#
您应该首先将该模式编译成一个类。我不确定在scala中是否有一个avro库,它已经可以用于生产,但您可以为java生成一个类,并在scala中使用它:
java -jar /path/to/avro-tools-1.7.7.jar compile schema tweet.avsc .
根据您的需要更改这一行,您应该会得到一个由这个工具生成的tweetavro.tweet类。然后你可以把它放到你的项目中,用你刚才描述的方式使用它。更多信息请点击此处
upd:仅供参考,scala似乎有一个图书馆,但我以前从未使用过
cld4siwp3#
我建议使用avrohugger。它是avro的scala case类的新成员,但是支持我所需要的一切,我非常喜欢它不是基于宏的,所以我可以看到生成的内容。
维护人员的工作非常出色,并且非常接受贡献和反馈。它没有,也可能永远不会像官方java代码gen那样功能丰富,但它将满足大多数人的需求。
目前,它缺少对联合(可选类型除外)和递归类型的支持。
sbt插件工作得非常好,如果您想快速了解它对avro模式的作用,有一个新的web界面:
https://avro2caseclass.herokuapp.com/
更多详情请参见:
https://github.com/julianpeeters/avrohugger