如何在databricks上运行etl管道(python)

vyswwuz2  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(453)

我正在尝试使用kinesis数据流创建一个实时情绪分析模型,并使用spark创建一个databricks笔记本。我注意到执行数据转换、构建模型、分析情绪和将数据发送到数据库的代码块在运行时只运行一次,但我希望这些代码块在我决定停止执行之前一直运行。在databricks中有没有一种方法可以让代码块连续运行,直到用户决定终止它们的执行?
我试着把我的笔记本当作一个作业来运行,但是处理spark流的代码块只会永远运行,永远不允许其他代码块完成etl过程。
这是我如何设置spark流的问题吗?下面是我如何设置它的代码:

kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", kinesisStreamName)\
  .option("region", kinesisRegion) \
  .option("initialPosition", "latest") \
  .option("format", "json") \
  .option("awsAccessKey", awsAccessKeyId)\
  .option("awsSecretKey", awsSecretKey) \
  .option("inferSchema", "true") \
  .load()
df = kinesisDF \
  .writeStream \
  .format("memory") \
  .outputMode("append") \
  .queryName("tweets")  \
  .start()

上面的代码块(.start)是我尝试将笔记本作为作业运行时连续运行的代码块,不允许其他代码块执行。p、 我对databricks和spark还很陌生

cotxawn7

cotxawn71#

我认为你需要遵循的方法为迪纳摩db作为Flume根据https://docs.databricks.com/spark/latest/structured-streaming/examples.html#write-在scala中使用foreach访问amazon dynamodb,在python中使用 foreach .
从db手册开始-关注 foreach :

from pyspark.sql.functions import *

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
       .selectExpr("value % 10 as key")
       .groupBy("key")
       .count()
       .toDF("key", "count")
       .writeStream
       .foreach(SendToDynamoDB_ForeachWriter())
      #.foreach(sendToDynamoDB_simple)  // alternative, use one or the other
       .outputMode("update")
       .start()'
)

streamingdf.writestream.foreach()允许您将流查询的输出写入任意位置。这就是线索。

iqjalb3h

iqjalb3h2#

在python中编写etl表单可以采用多种结构,这取决于专门的前提条件、业务目标、现有工具所擅长的库,以及设计人员认为他们必须在没有任何准备的情况下工作的程度。python的特性在于处理记录的信息结构和单词引用,这在etl任务中非常重要。
python具有足够的适应性,客户机可以用本地信息结构编写任何etl过程。例如,在python固有的数学模块的帮助下,筛选无效的质量非常简单:
import math data=[1.0,3.0,6.5,float('nan'),40.0,float('nan')]filtered=[]对于数据中的值:如果不是math.isnan(值):filtered.append(值)
在没有任何准备的情况下对整个etl过程进行编码并不是特别熟练,因此大多数etl代码最终都是纯python代码和远程描述的能力或文章的混合体,例如,那些来自前面引用的库的代码。例如,客户机可以利用pandas传递包含空值的行的整个Dataframe:
筛选=data.dropna()
python编程改进包(sdk)、应用程序编程接口(api)和不同的实用程序在某些阶段是可以访问的,其中一些可能有助于etl的编码。例如,anaconda stage是python对处理信息非常重要的模块和库的专用工具。它集成了自己的bundle-chief和云,便于共享代码草稿和python条件。
对于python中的大部分代码来说,重要的一部分是另外应用于etl编程的。例如,代码应该是“pythonic”——这意味着开发人员应该遵循一些明确的语言规则,使内容紧凑、清晰,并符合软件工程师的目标。文件是另外重要的,就像伟大的捆绑板和保持对条件的眼睛。

相关问题