如何将SQL查询的结果从Azure Synapse笔记本传递到Synapse管道中的下一个活动?

cnh2zyt3  于 2022-12-14  发布在  其他
关注(0)|答案(2)|浏览(143)

我在Synapse工作区中有一个主管道,其中有2项活动:
第1-笔记本活动
第二个-If条件活动
对于第一个(Synapse notebook、spark pool、pyspark),我有一个SQL单元格,如下所示:
它有一个使用连接的简单查询:

%%sql
SELECT A.name FROM A
LEFT JOIN B ON A.id = B.id

这将向我返回一些行(〈50行)
现在,我希望在Synapse管道的“If Condition”活动中访问少于50行的结果集,
如何执行此操作?
根据docs,我应该能够使用以下代码:

@activity(‘Notebook1’).output.status.Output.result.exitValue

但是我在synapse notebook输出中得到的exitValue是null。那么我如何在“If条件”中访问这个结果集呢?

4smxwvx5

4smxwvx51#

  • 您必须使用mssparkutils.notebook.exit从notebook返回值,才能使用@activity(‘Notebook1’).output.status.Output.result.exitValue从管道访问它。
  • 您可以使用spark.sql,而不是使用SQL单元,并使用df = spark.sql(Query)将结果存储在 Dataframe 中。
  • 您可以选择将整个 Dataframe 数据返回到管道,也可以使用dataframe.count()仅返回记录数(如果要验证记录数)。
mssparkutils.notebook.exit(str(df.count()))  #where df is the dataframe
  • 如果您要以对象数组的形式传回整个数据,以便数据可用于逐一查看和,以进行管缐中的其他作业,您可以使用下列程式码:
x = df.toPandas()
json = x.to_json(orient = 'records' )
mssparkutils.notebook.exit(json)

  • 当我运行带有笔记本活动的管道并将其存储在set变量中以显示输出时,它将给予以下结果:
@activity('Notebook1').output.status.Output.result.exitValue

  • 您可以使用@json方法将这个对象数组从字串(因为它是以字串传回)转换成实际的对象。
zy1mlcev

zy1mlcev2#

因此,我发现的一种方法是在创建Dataframe后,通过执行以下操作将输出显式附加到Notebook的exitValue:
以下是我在“Notebook1”中的4个单元格:
第一个
参考this将 Dataframe 列转换为列表

#Attaching the List to exitValue in Output
mssparkutils.notebook.exit(resultVariable)

然后,在“If Condition”活动的表达式中,我们可以使用以下语句来访问此结果:

activity(Notebook1).output.status.Output.result.exitValue

因此,通过这种方式,现在您可以在表达式中访问先前在exitValue中传递的List
这是我发现的一种方法。我很乐意知道是否有更简单的方法来做到这一点。

相关问题