我正在尝试用sparksql将avro消息写入kafka。有人能建议我如何用java实现它吗?我找到了一个scala参考代码,但没有找到java。
我试过了,但是抛出了一个错误,在哪里可以配置schema注册表。
aggr.selectExpr("CAST(order_id AS String) AS key", "to_avro(struct(*)) AS value").write().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "aggr_topic").save();
或者请将scala代码复制到java。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaURL)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryURL).as("key"),
from_avro($"value", "t-value", schemaRegistryURL).as("value"))
提前谢谢。
1条答案
按热度按时间vlf7wbxs1#
除了
val df
from_avro
只存在于databricks环境中writeStream
以及to_avro
,无论如何。另一种方法是使用foreachpartition将Dataframe转换为rdd,然后手动创建一个新的kafkaproducer来发送事件
你也可能对https://github.com/absaoss/abris