我正在尝试使用kinesis数据流创建一个实时情绪分析模型,并使用spark创建一个databricks笔记本。我注意到执行数据转换、构建模型、分析情绪和将数据发送到数据库的代码块在运行时只运行一次,但我希望这些代码块在我决定停止执行之前一直运行。在databricks中有没有一种方法可以让代码块连续运行,直到用户决定终止它们的执行?
我试着把我的笔记本当作一个作业来运行,但是处理spark流的代码块只会永远运行,永远不允许其他代码块完成etl过程。
这是我如何设置spark流的问题吗?下面是我如何设置它的代码:
kinesisDF = spark \
.readStream \
.format("kinesis") \
.option("streamName", kinesisStreamName)\
.option("region", kinesisRegion) \
.option("initialPosition", "latest") \
.option("format", "json") \
.option("awsAccessKey", awsAccessKeyId)\
.option("awsSecretKey", awsSecretKey) \
.option("inferSchema", "true") \
.load()
df = kinesisDF \
.writeStream \
.format("memory") \
.outputMode("append") \
.queryName("tweets") \
.start()
上面的代码块(.start)是我尝试将笔记本作为作业运行时连续运行的代码块,不允许其他代码块执行。p、 我对databricks和spark还很陌生
2条答案
按热度按时间cotxawn71#
我认为你需要遵循的方法为迪纳摩db作为Flume根据https://docs.databricks.com/spark/latest/structured-streaming/examples.html#write-在scala中使用foreach访问amazon dynamodb,在python中使用
foreach
.从db手册开始-关注
foreach
:streamingdf.writestream.foreach()允许您将流查询的输出写入任意位置。这就是线索。
iqjalb3h2#
在python中编写etl表单可以采用多种结构,这取决于专门的前提条件、业务目标、现有工具所擅长的库,以及设计人员认为他们必须在没有任何准备的情况下工作的程度。python的特性在于处理记录的信息结构和单词引用,这在etl任务中非常重要。
python具有足够的适应性,客户机可以用本地信息结构编写任何etl过程。例如,在python固有的数学模块的帮助下,筛选无效的质量非常简单:
import math data=[1.0,3.0,6.5,float('nan'),40.0,float('nan')]filtered=[]对于数据中的值:如果不是math.isnan(值):filtered.append(值)
在没有任何准备的情况下对整个etl过程进行编码并不是特别熟练,因此大多数etl代码最终都是纯python代码和远程描述的能力或文章的混合体,例如,那些来自前面引用的库的代码。例如,客户机可以利用pandas传递包含空值的行的整个Dataframe:
筛选=data.dropna()
python编程改进包(sdk)、应用程序编程接口(api)和不同的实用程序在某些阶段是可以访问的,其中一些可能有助于etl的编码。例如,anaconda stage是python对处理信息非常重要的模块和库的专用工具。它集成了自己的bundle-chief和云,便于共享代码草稿和python条件。
对于python中的大部分代码来说,重要的一部分是另外应用于etl编程的。例如,代码应该是“pythonic”——这意味着开发人员应该遵循一些明确的语言规则,使内容紧凑、清晰,并符合软件工程师的目标。文件是另外重要的,就像伟大的捆绑板和保持对条件的眼睛。