我如何在akka流中解析avro byteString流

j91ykkif  于 2022-11-23  发布在  其他
关注(0)|答案(1)|浏览(123)

我在s3 bucket中有Avro文件,并试图流和解析到一个case类中。我有要解析的模式,但不知道如何继续。
我使用s3.download从s3 bucket中下载并流式传输文件,然后将其转换为utf8string。
请帮助,我如何分析与模式,我们有考虑到输入流我得到。

vof42yt1

vof42yt11#

我将根据您提出的使用模式通过Avro对消息进行序列化(反序列化)的问题来回答这个问题。
我有要解析的架构,但不知道如何继续。
假设你已经从s3.bucket下载了消息,然后我将使用我的例子,我在PostgreSQL上持久化消息,只是为了一个工作的例子,但是你可以假设你的s3.bucket连接。
我正在使用com.sksamuel.avro4s库来创建我的Avro序列化器。下面是放入build.sbt的必要库:

val akkaVersion = "2.6.10"
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-persistence" % akkaVersion,
  "com.sksamuel.avro4s" %% "avro4s-core" % "4.0.4",
  "org.xerial.snappy" % "snappy-java" % "1.1.8.2",
  "org.postgresql" % "postgresql" % "42.2.2",
  "com.github.dnvriend" %% "akka-persistence-jdbc" % "3.4.0",

然后创建序列化器,在我的例子中是MyFirstAvroSerializer,扩展akka.serialization.Serializer,它有一个模式,在我的例子中是case类CompanyRegistry,基本上,您必须实现方法identifier,它必须有一个唯一的ID,toBinaryfromBinary来转换消息,includeManifest是假的因为我不需要清单。

import akka.serialization.Serializer
import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream, AvroSchema}
import com.typesafe.config.ConfigFactory

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

case class BankAccount(iban: String, bankCode: String, amount: Double, currency: String)
case class CompanyRegistry(name: String, accounts: Seq[BankAccount], activityCode: String, marketCap: Double)

class MyFirstAvroSerializer extends Serializer {
  val schema = AvroSchema[CompanyRegistry]
  override def identifier: Int = 454874
  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case c: CompanyRegistry =>
      val baos = new ByteArrayOutputStream()
      val avroOutputStream = AvroOutputStream.binary[CompanyRegistry].to(baos).build() // schema
      avroOutputStream.write(c)
      avroOutputStream.flush()
      avroOutputStream.close()
      baos.toByteArray
    case _ => throw new IllegalArgumentException(s"we only support CompanyRegistry for Avro")
  }
  override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
    val avroInputStream = AvroInputStream.binary[CompanyRegistry].from(new ByteArrayInputStream(bytes)).build(schema)
    val companyRegistryIterator: Iterator[CompanyRegistry] = avroInputStream.iterator
    val companyRegistry = companyRegistryIterator.next()
    avroInputStream.close()
    companyRegistry
  }
  override def includeManifest: Boolean = false
}

然后,您必须配置您的项目,以便在参与者之间交换akka消息时调用此序列化程序。通过添加特定的配置,在application.conf上配置它。在我的示例中是avroSerializable。您在serializers作用域下设置了MyFirstAvroSerializer,在serialization-bindings作用域下设置了case类。我还为Akka-remote进行了配置,但您可以忽略它。

avroSerializable {
  akka {
    actor {
      provider = remote
      #allow-java-serialization = off
      serializers {
        java = "akka.serialization.JavaSerializer"
        avro = "org.github.felipegutierrez.explore.akka.classic.remote.serialization.MyFirstAvroSerializer"
      }
      serialization-bindings {
        "org.github.felipegutierrez.explore.akka.classic.remote.serialization.CompanyRegistry" = avro
        "java.io.Serializable" = java
      }
    }
    remote {
      artery {
        enabled = on
        transport = aeron-udp
        canonical.hostname = "localhost"
      }
    }
  }
}

就像我一开始说的,我使用的是PostgreSQL。但是在你的例子中,它将是s3桶存储配置。我将离开这里只是为了完整,因为我在创建actor系统时调用了这个配置。

postgresStore {
  akka.persistence.journal.plugin = "jdbc-journal"
  akka.persistence.snapshot-store.plugin = "jdbc-snapshot-store"
  akka.actor.allow-java-serialization = on
  # create JDBC configuration to Akka persistence
  akka-persistence-jdbc {
    shared-databases {
      slick {
        profile = "slick.jdbc.PostgresProfile$"
        db {
          numThreads = 10
          driver = "org.postgresql.Driver"
          url = "jdbc:postgresql://localhost:5432/rtjvm"
          user = "docker"
          password = "docker"
        }
      }
    }
  }
  # dbinding the JDBC plugins with the configureation created above
  jdbc-journal {
    use-shared-db = "slick"
  }
  jdbc-snapshot-store {
    use-shared-db = "slick"
  }
}

现在是时候创建actor系统和一个简单的actor SimplePersistentActor并通过网络发送消息了。SimplePersistentActor只是一个非常简单的actor,它接受我发送的消息,没有什么特别的。

object AvroSerialization_Persistence {
  def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load().getConfig("postgresStore")
      .withFallback(ConfigFactory.load("avroSerializable"))
    val system = ActorSystem("postgresStoreSystem", config)
    val simplePersistentActor = system.actorOf(SimplePersistentActor.props("avro-actor"), "personAvroActor")
    val companyRegistryMsg = CompanyRegistry(
      "Google",
      Seq(
        BankAccount("US-1234", "google-bank", 4.3, "gazillion dollars"),
        BankAccount("GB-4321", "google-bank", 0.5, "trillion pounds")
      ),
      "ads",
      523895
    )
    simplePersistentActor ! companyRegistryMsg
  }
}

相关问题