nifi在特定的公共头上连接两个csv流文件

vawmfj5a  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(381)

尝试以csv格式合并两个传入的流文件,并准备一个组合的csv输出,其中包含基于某个公共头值的数据。
需要基于“creation\u date”和“hour\u of \u day”合并行,并使用“source\u count”和“hive\u count”之间的计算差合并两个流文件。如果输入2缺少输入1的某一天的某个创建日期/小时,我需要在“差异”标题下加上“不可用”。
如何在nifi实现这一点?有没有一种方法可以使用mergecontent加入并对输出文件运行查询以生成差异?
任何帮助都将不胜感激。:)
源输入1

creation_Date,Hour_of_day,source_count
2018-08-16,3,19934
2018-08-16,0,3278
2018-08-16,1,10680
2018-08-16,4,19705
2018-08-16,2,14199
2018-08-16,5,672

源输入2

creation_date,hour_of_day,hive_count
2018-08-16,0,3279
2018-08-16,1,10680
2018-08-16,3,19933
2018-08-16,2,14199
2018-08-16,4,19700

组合输出

creation_Date,hour_of_day,source_count,hive_count,difference
2018-08-16,0,3278,3279,-1
2018-08-16,1,10680,10680,0
2018-08-16,2,14199,14199,0
2018-08-16,3,19934,19933,1
2018-08-16,4,19705,19700,5
2018-08-16,5,672,0,(NotAvailable)
kxeu7u2r

kxeu7u2r1#

apachenifi通常不会执行这些类型的流连接。
它可以做的连接更多的是查找连接,其中数据的一端是固定的,另一端来自传入流。这就是查找服务的工作原理,我相信有一个csv查找服务。
另一种方法是将数据插入数据库,然后使用executesql发出sql查询,将它们连接在一起,并将结果写入单个流文件。
最后,您可以使用流处理系统,如spark、flink、storm等,它们都具有真正的流连接。
mergecontent意味着将“like”数据合并在一起,因此,如果所有数据都是具有相同列的csv,并且您使用该数据创建了多个流文件,那么您可以将它们全部合并到一个大csv中。

ldfqzlk8

ldfqzlk82#

@奇怪的是,这种方法可能会有所帮助
使用queryrecord processor联接记录并生成具有不同属性的行:

select fa.t, fa.v a, fb.v b, fc.v c
 from (
   select t, v from FLOWFILE where m = 'a'
 ) fa
 left join (
   select t, v from FLOWFILE where m = 'b'
 ) fb on fa.t = fb.t
 left join (
   select t, v from FLOWFILE where m = 'c'
 ) fc on fa.t = fc.t

更多信息请访问https://gist.github.com/ijokarumawak/7e20af1cd222fb2adf13acb2b0f46aed

相关问题