使用scala(或java)通过kafka发布和使用case类的好方法是什么?
我在这里找到了包括答案在内的各种部分例子,但没有完整的例子。
我通过使用自定义Kafka序列化成功地实现了这一点,如下所示,但我想了解是否有更好的方法:
泛型序列化程序类:
package mypackage
import java.nio.charset.Charset
import java.util
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Serializer
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization
class ObjectSerializer[T <: AnyRef] extends Serializer[T] {
override def configure(config: util.Map[String, _], isKey: Boolean): Unit = {
}
override def serialize(topic: String, data: T): Array[Byte] = {
if (data == null) {
return Array.empty[Byte]
} else {
try {
implicit val formats = DefaultFormats
val json = Serialization.write[T](data)
val bytes = json.getBytes(Charset.forName("utf-8"))
return bytes
} catch {
case ex: Exception =>
throw new SerializationException(ex)
}
}
}
override def close(): Unit = {
}
}
泛型反序列化程序基类:
package mypackage
import java.nio.charset.Charset
import java.util
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Deserializer
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization
class ObjectDeserializer[T <: AnyRef](tType: Class[T]) extends Deserializer[T] {
private val actualType = tType
override def configure(props: util.Map[String, _], isKey: Boolean): Unit = {
}
override def deserialize(s: String, bytes: Array[Byte]): T = {
if (bytes == null || bytes.length == 0) {
return null.asInstanceOf[T]
} else {
try {
val json = new String(bytes, Charset.forName("utf-8"))
implicit val formats = DefaultFormats
implicit val mf = Manifest.classType[T](actualType)
val data = Serialization.read[T](json)(formats, mf)
return data
} catch {
case ex: Exception =>
throw new SerializationException(ex)
}
}
}
override def close(): Unit = {
}
}
特定的反序列化程序类:
package mypackage
import payoneer.labs.techExamples.dataModel.MyDataA
class MyDataADeserializer extends ObjectDeserializer[MyDataA](classOf[MyDataA]) {
}
配置:
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ObjectSerializer[MyDataA]]
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[MyDataADeserializer]
然后使用 KafkaProducer[String, MyDataA]
以及 KafkaConsumer[String, MyDataA]
,假设我要使用的键是 String
我想用的值是 MyDataA
.
然后我可以使用:
producer.send(new ProducerRecord[String, MyDataA](topic, key, value))
并使用:
while (true) {
val messages = consumer.poll(1000).iterator().asScala
for (msg <- messages) {
val key : String = msg.key()
val value : MyDataA = msg.value()
}
}
相关问题:
将自定义java对象发送到kafka主题
发送到Kafka主题时序列化消息时出错
1条答案
按热度按时间kuarbcqp1#
根据我的Kafka经验,我可以建议你使用KafkaReact堆(https://github.com/reactor/reactor-kafka). 它还有一个很好的文档(http://projectreactor.io/docs/kafka/snapshot/reference/)