akka kafka自定义序列化程序

u91tlkcl  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(461)

我正在使用akkafka(scala)并希望发送自定义对象。

class TweetsSerializer extends Serializer[Seq[MyCustomType]] {

override def configure(configs: util.Map[String, _], isKey: Boolean):   Unit = ???

override def serialize(topic: String, data: Seq[MyCustomType]): Array[Byte] = ???

override def close(): Unit = ???

}

如何正确编写自己的序列化程序?我该怎么处理菲尔德 config ?

vhmi4jdf

vhmi4jdf1#

我会使用stringserializer,我是说,我´我把所有的类型转换成字符串,然后再产生它们。不管怎样:

case class MyCustomType(a: Int)

  class TweetsSerializer extends Serializer[Seq[MyCustomType]] {

    private var encoding = "UTF8"

    override def configure(configs: java.util.Map[String, _], isKey: Boolean):   Unit = {
      val propertyName = if (isKey) "key.serializer.encoding"
      else "value.serializer.encoding"
      var encodingValue = configs.get(propertyName)
      if (encodingValue == null) encodingValue = configs.get("serializer.encoding")
      if (encodingValue != null && encodingValue.isInstanceOf[String]) encoding = encodingValue.asInstanceOf[String]
    }

    override def serialize(topic: String, data: Seq[MyCustomType]): Array[Byte] =
      try
          if (data == null) return null
          else return {
            data.map { v =>
              v.a.toString
            }
            .mkString("").getBytes("UTF8")
          }
      catch {
        case e: UnsupportedEncodingException =>
          throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding)
      }

    override def close(): Unit = Unit

  }

}

object testCustomKafkaSerializer extends App {

  implicit val producerConfig = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092")
    props.setProperty("key.serializer", classOf[StringSerializer].getName)
    props.setProperty("value.serializer", classOf[TweetsSerializer].getName)
    props
  }

  lazy val kafkaProducer = new KafkaProducer[String, Seq[MyCustomType]](producerConfig)

  // Create scala future from Java
  private def publishToKafka(id: String, data: Seq[MyCustomType]) = {
      kafkaProducer
        .send(new ProducerRecord("outTopic", id, data))
        .get()
  }

  val input = MyCustomType(1)

  publishToKafka("customSerializerTopic", Seq(input))

}

相关问题