java在sparksql2.4.4Dataframe中生成avro类型消息到kafka

kmb7vmvb  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(409)

我正在尝试用sparksql将avro消息写入kafka。有人能建议我如何用java实现它吗?我找到了一个scala参考代码,但没有找到java。
我试过了,但是抛出了一个错误,在哪里可以配置schema注册表。

aggr.selectExpr("CAST(order_id AS String) AS key", "to_avro(struct(*)) AS value").write().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "aggr_topic").save();

或者请将scala代码复制到java。

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaURL)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryURL).as("key"),
    from_avro($"value", "t-value", schemaRegistryURL).as("value"))

提前谢谢。

vlf7wbxs

vlf7wbxs1#

除了
val df from_avro 只存在于databricks环境中 writeStream 以及 to_avro ,无论如何。
另一种方法是使用foreachpartition将Dataframe转换为rdd,然后手动创建一个新的kafkaproducer来发送事件
你也可能对https://github.com/absaoss/abris

相关问题