我在一个apache flink项目中遇到了以下情况。
3个不同对象的流,如
person->string id,string firstname,string lastname(即101,john,doe)
persondetail->string id,string address,string city,string phonenumber,long personid(即99,stefansplatz 1,+43066012345678,101)
personadddetail->string id,string adddetailtype,object adddetailvalue,long personid(即77,1,hansi或78,2,1234或80,3,true)
我想将这些流中的对象聚合到一个新的对象中(不确定这里的措辞是否正确),然后将其放入一个新的流中。聚合应该基于personid,作为额外的捕获,我需要只过滤出具有特定adddetailtype的personadddetail(假设我只对类型为1和2的对象感兴趣)。
聚合对象应该看起来像
personreport->long id,string firstname,string lastname,string address,string city,string phonenumber,arraylist详细信息
现在的问题是,如果这是可能的,如果是的话,我如何才能做到这一点。欢迎每一个输入。
2条答案
按热度按时间fafcakar1#
感谢@jeremy grand comment,我自己想出了一个解决方案,我想和大家分享我的想法和代码。我介绍了一个叫做personcontainer的新类
这是很重要的一部分,因为我将首先将三个输入流的对象Map到这个公共对象,然后合并这些流。
下面是我所做的,我在评论中描述了单个步骤。简而言之,我将三个输入流Map到新引入的容器的新流。然后我对这三个流进行联合,并使用迭代模式为这些对象设置键,然后使用我的自定义合并方法合并它们。最后,我定义了一个定制的complete方法来区分最终Map到输出的完全合并容器和反馈到合并过程中的尚未完成的容器。
这种方法对我很有效,好的是我可以处理流中延迟到达的对象(比如说,personadddetail在其他对象之后的几分钟内到达),并且我不需要定义某种类型的窗口。不管怎样,谢谢你的意见
mcvgt66p2#
你的问题听起来像是个问题
join
操作。你可以这样做:请注意,一般来说,在无界(无限)集合上不可能执行联接操作,因此需要将其绑定到windows中。