Spark:3.0.0
斯卡拉:2.12
汇合
我有Spark结构化流的工作,并寻找一个写Dataframe到Kafka在protbuf格式的例子。
我从postgresql读取消息,在完成所有转换之后,有一个带有key和value的Dataframe:
root
|-- key: string (nullable = true)
|-- value: binary (nullable = false)
将消息推送到Kafka的代码:
val kafkaOptions = Seq(
KAFKA_BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer",
"schema.registry.url" -> "http://localhost:8081",
"topic" -> "test_users"
)
tDF
.write
.format(KAFKA)
.options(kafkaOptions.toMap)
.save()
二进制格式的消息已发布,但我无法反序列化,因为合流中没有架构
有没有一个lib可以简单地为我做一些事情?或者我可以参考的示例代码。
暂无答案!
目前还没有任何答案,快来回答吧!