我遇到了一个非常琐碎的问题,但目前,我找不到解决办法。
假设我有一个sparkDataframe,它可以是非类型化的或强类型的,其实并不重要。
现在我想把它发布给Kafka,下面的代码非常有用:
df2.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
.write.format("kafka")
.option("kafka.bootstrap.servers", "host:9092")
.option("topic", "test").save()
然而,我希望发布使用更复杂的消毒器-在我的情况下一个自定义的。
我该怎么做?换句话说,我希望发布一个对象,而不是发布字符串。
我的数据源是vertica,我使用vertica连接器来使用事件。
1条答案
按热度按时间polhcujo1#
您可以使用foreachpartition以自定义方式将数据发布到外部源。这样,如果使用foreach,您将为每个分区只创建一个连接,而不是记录。