问题上下文我成功连接到一个API,并向一个端点发送了一个请求,该端点以JSON数组的形式返回数据行。该Json文档随后被保存为Json数据集,用于下游解析和扁平化。数据最终被转换并保存为2列csv数据集。
这样做的目的是使用2列csv Dataset作为管道中ForEach活动的行数据输入源。ForEach的每次迭代将使用行property_id向enpoint发送API请求,该请求将返回单个属性的详细信息。然后,它将随后使用存储过程转换这些详细信息并将其保存到数据库的暂存区。
问题我是新的使用synapse和仍在学习什么是最好的模式,工具和技术是摄取,转换和存储数据的下游分析和机器学习工作负载。
在这种情况下,我不确定如何在数据集上执行循环活动。
支持材料原始API Json数据如下所示:Data pipeline - Input API Json data
数据流水线如下所示,以供参考:Propert Ingestion Pipeline
数据流接收器生成的csv数据集如下所示:Transform Etags Data Flow Output
我尝试使用csv数据集作为ForEach活动的输入,但不确定如何实现这一点。我希望可以遍历每一行并提取property_id和etag。并随后在每次迭代中使用唯一的property_id来调用API并返回在更新暂存区域时要使用的详细属性细节。
我还没有成功地访问每个迭代的相关数据。在线搜索显示了许多使用数组作为ForEach活动输入的示例,但是我不确定这是否适用于我的场景。
提前感谢您
1条答案
按热度按时间vhipe2zx1#
当您将接收器存储到csv文件时,您可以使用查找活动从csv文件中获取JSON数组。
在数据流活动之后,添加一个查找活动并给予接收器csv文件。您需要确保数据流将输出写入单个文件。为此,请转到数据流接收器设置,并在文件名选项中选择输出到单个文件,然后给予您的csv文件名。
在lookup活动中,给予csv数据集,并取消选中仅第一行。
Lookup活动将给予如下JSON数组的输出。
您可以使用下面的表达式将上面的查找数组提供给Foreach。
在ForEach中,您需要使用
@item().property_id
、@item().etag
来访问值。如果您的csv大小小于2 MB,您可以使用高速缓存接收器缓存从Dataflow本身获取JSON数组,而不是查找。
通过这个SO answer来了解它。
**注意:**数据流缓存接收器的限制为2 MB,查找活动输出限制为4 MB(5000行)。如果你的csv文件大小超过这个值,你需要使用两级管道来迭代。
因此,首先将您的csv存储为部分csv文件(使用分区号以获得较小的部分文件)。进入Flume设置,将文件名选项设置为默认。
现在,去优化和设置分区的数量.
这将在接收器文件夹中给予2个零件文件。在数据流活动之后,您需要使用“获取Meta数据”活动来获取部件文件列表。把这个给ForEach,然后在ForEach内部对每个文件进行查找。在它之后使用Execute管道,并将查找活动输出作为数组参数传递给子管道。在子管道内部,使用另一个ForEach并将数组参数赋予它。此ForEach将遍历文件中的每条记录。
通过@Pratik Lad的SO answer来了解它。