Apache Spark 从数据仓库中的Delta Live表流式传输到Kafka示例

sqxo8psd  于 2022-11-16  发布在  Apache
关注(0)|答案(2)|浏览(161)

我有下面的实时表

我想把它写进一个流,然后再写回我的Kafka源代码。
我在apache spark文档中看到我可以使用writeStream(我已经使用readStream将其从我的Kafka流中取出),但是我如何将表转换成它需要的介质,以便它可以使用它呢?
我对Kafka和数据世界都是相当陌生的,所以这里欢迎任何进一步的解释。

writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()

提前致谢,

我在apache spark文档中看到我可以使用writeStream(我已经用readStream把它从我的Kafka流中取出来了),但是我如何把表转换成它所需要的媒体,以便它可以使用它呢?我对kafka和数据世界都是相当陌生的,所以这里欢迎任何进一步的解释。

writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
7cjasjjr

7cjasjjr1#

截至 * 现在 *,Delta Live Tables只能以Delta表的形式写入数据-无法以其他格式写入。您可以通过创建由两个任务组成的Databricks工作流(具有依赖性或不具有依赖性,取决于是否触发管道)来实施解决方案:

  • 将执行实际数据处理的DLT管道
  • 一个任务(使用笔记本最简单的方法),它将读取DLT生成的表作为流,并将其内容写入Kafka,如下所示:
df = spark.readStream.format("delta").table("database.table_name")
(df.write.format("kafka").option("kafka....", "")
  .trigger(availableNow=True) # if it's not continuous
  .start()
)

P.S.如果您的Databricks客户有解决方案架构师或客户成功工程师,您可以将此要求告知他们,以便确定产品优先级。

jv4diomz

jv4diomz2#

转换在读取流进程启动后完成

read_df = spark.readStream.format('kafka') ... .... # other options

processed_df = read_df.withColumn('some column', some_calculation )

processed_df.writeStream.format('parquet') ... .... # other options
                         .start()

Spark文档很有帮助,也很详细,但是有些文章不适合初学者。你可以在YouTube上看看或者阅读一些文章来帮助你入门,比如this one

相关问题