将Dataframe发布到kafka

xbp102n0  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(339)

我遇到了一个非常琐碎的问题,但目前,我找不到解决办法。
假设我有一个sparkDataframe,它可以是非类型化的或强类型的,其实并不重要。
现在我想把它发布给Kafka,下面的代码非常有用:

df2.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
      .write.format("kafka")
      .option("kafka.bootstrap.servers", "host:9092")
      .option("topic", "test").save()

然而,我希望发布使用更复杂的消毒器-在我的情况下一个自定义的。
我该怎么做?换句话说,我希望发布一个对象,而不是发布字符串。
我的数据源是vertica,我使用vertica连接器来使用事件。

polhcujo

polhcujo1#

您可以使用foreachpartition以自定义方式将数据发布到外部源。这样,如果使用foreach,您将为每个分区只创建一个连接,而不是记录。

相关问题