我在Synapse工作区中有一个主管道,其中有2项活动:
第1-笔记本活动
第二个-If条件活动
对于第一个(Synapse notebook、spark pool、pyspark),我有一个SQL单元格,如下所示:
它有一个使用连接的简单查询:
%%sql
SELECT A.name FROM A
LEFT JOIN B ON A.id = B.id
这将向我返回一些行(〈50行)
现在,我希望在Synapse管道的“If Condition”活动中访问少于50行的结果集,
如何执行此操作?
根据docs,我应该能够使用以下代码:
@activity(‘Notebook1’).output.status.Output.result.exitValue
但是我在synapse notebook输出中得到的exitValue是null。那么我如何在“If条件”中访问这个结果集呢?
2条答案
按热度按时间4smxwvx51#
mssparkutils.notebook.exit
从notebook返回值,才能使用@activity(‘Notebook1’).output.status.Output.result.exitValue
从管道访问它。spark.sql
,而不是使用SQL单元,并使用df = spark.sql(Query)
将结果存储在 Dataframe 中。dataframe.count()
仅返回记录数(如果要验证记录数)。@json
方法将这个对象数组从字串(因为它是以字串传回)转换成实际的对象。zy1mlcev2#
因此,我发现的一种方法是在创建Dataframe后,通过执行以下操作将输出显式附加到Notebook的exitValue:
以下是我在“Notebook1”中的4个单元格:
第一个
参考this将 Dataframe 列转换为列表
然后,在“If Condition”活动的表达式中,我们可以使用以下语句来访问此结果:
因此,通过这种方式,现在您可以在表达式中访问先前在exitValue中传递的List
这是我发现的一种方法。我很乐意知道是否有更简单的方法来做到这一点。