在scala中导入avro模式

zc0qhyus  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(482)

我正在写一个简单的twitter程序,在这里我使用kafka阅读tweet,并希望使用avro进行序列化。到目前为止,我刚刚在scala中设置了twitter配置,现在想使用这个配置来阅读tweet。
如何导入程序中tweets.avsc文件中定义的以下avro模式?

{
    "namespace": "tweetavro",
    "type": "record",
    "name": "Tweet",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "text", "type": "string"}
    ]
}

我在网上看了一些例子 import tweetavro.Tweet 在scala中导入模式以便我们可以像

def main (args: Array[String]) {
    val twitterStream = TwitterStream.getStream
    twitterStream.addListener(new OnTweetPosted(s => sendToKafka(toTweet(s))))
    twitterStream.filter(filterUsOnly)
  }

  private def toTweet(s: Status): Tweet = {
    new Tweet(s.getUser.getName, s.getText)
  }

  private def sendToKafka(t:Tweet) {
    println(toJson(t.getSchema).apply(t))
    val tweetEnc = toBinary[Tweet].apply(t)
    val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, tweetEnc)
    kafkaProducer.send(msg)
  }

我遵循同样的,并使用以下插件中 pom.xml ```

lc8prwob

lc8prwob1#

你也可以用avro4s。基于模式定义(或生成)case类。我们叫那个班吧 Tweet . 然后创建一个 AvroOutputStream ,它也将从case类推断模式,并用于序列化示例。然后我们可以写入一个字节数组,然后发送给Kafka。如:

val tweet: Tweet= ... // the instance you want to serialize

val out = new ByteArrayOutputStream // we collect the serialized output in this
val avro = AvroOutputStream[Tweet](out) // you specify the type here as well
avro.write(tweet)
avro.close()

val bytes = out.toByteArray
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, bytes)
kafkaProducer.send(msg)
elcex8rz

elcex8rz2#

您应该首先将该模式编译成一个类。我不确定在scala中是否有一个avro库,它已经可以用于生产,但您可以为java生成一个类,并在scala中使用它: java -jar /path/to/avro-tools-1.7.7.jar compile schema tweet.avsc . 根据您的需要更改这一行,您应该会得到一个由这个工具生成的tweetavro.tweet类。然后你可以把它放到你的项目中,用你刚才描述的方式使用它。
更多信息请点击此处
upd:仅供参考,scala似乎有一个图书馆,但我以前从未使用过

cld4siwp

cld4siwp3#

我建议使用avrohugger。它是avro的scala case类的新成员,但是支持我所需要的一切,我非常喜欢它不是基于宏的,所以我可以看到生成的内容。
维护人员的工作非常出色,并且非常接受贡献和反馈。它没有,也可能永远不会像官方java代码gen那样功能丰富,但它将满足大多数人的需求。
目前,它缺少对联合(可选类型除外)和递归类型的支持。
sbt插件工作得非常好,如果您想快速了解它对avro模式的作用,有一个新的web界面:
https://avro2caseclass.herokuapp.com/
更多详情请参见:
https://github.com/julianpeeters/avrohugger

相关问题