我在s3 bucket中有Avro文件,并试图流和解析到一个case类中。我有要解析的模式,但不知道如何继续。我使用s3.download从s3 bucket中下载并流式传输文件,然后将其转换为utf8string。请帮助,我如何分析与模式,我们有考虑到输入流我得到。
s3.download
vof42yt11#
我将根据您提出的使用模式通过Avro对消息进行序列化(反序列化)的问题来回答这个问题。我有要解析的架构,但不知道如何继续。假设你已经从s3.bucket下载了消息,然后我将使用我的例子,我在PostgreSQL上持久化消息,只是为了一个工作的例子,但是你可以假设你的s3.bucket连接。我正在使用com.sksamuel.avro4s库来创建我的Avro序列化器。下面是放入build.sbt的必要库:
build.sbt
val akkaVersion = "2.6.10" "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-persistence" % akkaVersion, "com.sksamuel.avro4s" %% "avro4s-core" % "4.0.4", "org.xerial.snappy" % "snappy-java" % "1.1.8.2", "org.postgresql" % "postgresql" % "42.2.2", "com.github.dnvriend" %% "akka-persistence-jdbc" % "3.4.0",
然后创建序列化器,在我的例子中是MyFirstAvroSerializer,扩展akka.serialization.Serializer,它有一个模式,在我的例子中是case类CompanyRegistry,基本上,您必须实现方法identifier,它必须有一个唯一的ID,toBinary和fromBinary来转换消息,includeManifest是假的因为我不需要清单。
MyFirstAvroSerializer
akka.serialization.Serializer
CompanyRegistry
identifier
toBinary
fromBinary
includeManifest
import akka.serialization.Serializer import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream, AvroSchema} import com.typesafe.config.ConfigFactory import java.io.{ByteArrayInputStream, ByteArrayOutputStream} case class BankAccount(iban: String, bankCode: String, amount: Double, currency: String) case class CompanyRegistry(name: String, accounts: Seq[BankAccount], activityCode: String, marketCap: Double) class MyFirstAvroSerializer extends Serializer { val schema = AvroSchema[CompanyRegistry] override def identifier: Int = 454874 override def toBinary(o: AnyRef): Array[Byte] = o match { case c: CompanyRegistry => val baos = new ByteArrayOutputStream() val avroOutputStream = AvroOutputStream.binary[CompanyRegistry].to(baos).build() // schema avroOutputStream.write(c) avroOutputStream.flush() avroOutputStream.close() baos.toByteArray case _ => throw new IllegalArgumentException(s"we only support CompanyRegistry for Avro") } override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = { val avroInputStream = AvroInputStream.binary[CompanyRegistry].from(new ByteArrayInputStream(bytes)).build(schema) val companyRegistryIterator: Iterator[CompanyRegistry] = avroInputStream.iterator val companyRegistry = companyRegistryIterator.next() avroInputStream.close() companyRegistry } override def includeManifest: Boolean = false }
然后,您必须配置您的项目,以便在参与者之间交换akka消息时调用此序列化程序。通过添加特定的配置,在application.conf上配置它。在我的示例中是avroSerializable。您在serializers作用域下设置了MyFirstAvroSerializer,在serialization-bindings作用域下设置了case类。我还为Akka-remote进行了配置,但您可以忽略它。
application.conf
avroSerializable
serializers
serialization-bindings
avroSerializable { akka { actor { provider = remote #allow-java-serialization = off serializers { java = "akka.serialization.JavaSerializer" avro = "org.github.felipegutierrez.explore.akka.classic.remote.serialization.MyFirstAvroSerializer" } serialization-bindings { "org.github.felipegutierrez.explore.akka.classic.remote.serialization.CompanyRegistry" = avro "java.io.Serializable" = java } } remote { artery { enabled = on transport = aeron-udp canonical.hostname = "localhost" } } } }
就像我一开始说的,我使用的是PostgreSQL。但是在你的例子中,它将是s3桶存储配置。我将离开这里只是为了完整,因为我在创建actor系统时调用了这个配置。
postgresStore { akka.persistence.journal.plugin = "jdbc-journal" akka.persistence.snapshot-store.plugin = "jdbc-snapshot-store" akka.actor.allow-java-serialization = on # create JDBC configuration to Akka persistence akka-persistence-jdbc { shared-databases { slick { profile = "slick.jdbc.PostgresProfile$" db { numThreads = 10 driver = "org.postgresql.Driver" url = "jdbc:postgresql://localhost:5432/rtjvm" user = "docker" password = "docker" } } } } # dbinding the JDBC plugins with the configureation created above jdbc-journal { use-shared-db = "slick" } jdbc-snapshot-store { use-shared-db = "slick" } }
现在是时候创建actor系统和一个简单的actor SimplePersistentActor并通过网络发送消息了。SimplePersistentActor只是一个非常简单的actor,它接受我发送的消息,没有什么特别的。
SimplePersistentActor
object AvroSerialization_Persistence { def main(args: Array[String]): Unit = { val config = ConfigFactory.load().getConfig("postgresStore") .withFallback(ConfigFactory.load("avroSerializable")) val system = ActorSystem("postgresStoreSystem", config) val simplePersistentActor = system.actorOf(SimplePersistentActor.props("avro-actor"), "personAvroActor") val companyRegistryMsg = CompanyRegistry( "Google", Seq( BankAccount("US-1234", "google-bank", 4.3, "gazillion dollars"), BankAccount("GB-4321", "google-bank", 0.5, "trillion pounds") ), "ads", 523895 ) simplePersistentActor ! companyRegistryMsg } }
1条答案
按热度按时间vof42yt11#
我将根据您提出的使用模式通过Avro对消息进行序列化(反序列化)的问题来回答这个问题。
我有要解析的架构,但不知道如何继续。
假设你已经从s3.bucket下载了消息,然后我将使用我的例子,我在PostgreSQL上持久化消息,只是为了一个工作的例子,但是你可以假设你的s3.bucket连接。
我正在使用com.sksamuel.avro4s库来创建我的Avro序列化器。下面是放入
build.sbt
的必要库:然后创建序列化器,在我的例子中是
MyFirstAvroSerializer
,扩展akka.serialization.Serializer
,它有一个模式,在我的例子中是case类CompanyRegistry
,基本上,您必须实现方法identifier
,它必须有一个唯一的ID,toBinary
和fromBinary
来转换消息,includeManifest
是假的因为我不需要清单。然后,您必须配置您的项目,以便在参与者之间交换akka消息时调用此序列化程序。通过添加特定的配置,在
application.conf
上配置它。在我的示例中是avroSerializable
。您在serializers
作用域下设置了MyFirstAvroSerializer
,在serialization-bindings
作用域下设置了case类。我还为Akka-remote进行了配置,但您可以忽略它。就像我一开始说的,我使用的是PostgreSQL。但是在你的例子中,它将是s3桶存储配置。我将离开这里只是为了完整,因为我在创建actor系统时调用了这个配置。
现在是时候创建actor系统和一个简单的actor
SimplePersistentActor
并通过网络发送消息了。SimplePersistentActor
只是一个非常简单的actor,它接受我发送的消息,没有什么特别的。