java—如何将3个流中的数据聚合到单个组合对象?

cnh2zyt3  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(308)

我在一个apache flink项目中遇到了以下情况。
3个不同对象的流,如
person->string id,string firstname,string lastname(即101,john,doe)
persondetail->string id,string address,string city,string phonenumber,long personid(即99,stefansplatz 1,+43066012345678,101)
personadddetail->string id,string adddetailtype,object adddetailvalue,long personid(即77,1,hansi或78,2,1234或80,3,true)
我想将这些流中的对象聚合到一个新的对象中(不确定这里的措辞是否正确),然后将其放入一个新的流中。聚合应该基于personid,作为额外的捕获,我需要只过滤出具有特定adddetailtype的personadddetail(假设我只对类型为1和2的对象感兴趣)。
聚合对象应该看起来像
personreport->long id,string firstname,string lastname,string address,string city,string phonenumber,arraylist详细信息
现在的问题是,如果这是可能的,如果是的话,我如何才能做到这一点。欢迎每一个输入。

fafcakar

fafcakar1#

感谢@jeremy grand comment,我自己想出了一个解决方案,我想和大家分享我的想法和代码。我介绍了一个叫做personcontainer的新类

public class PersonContainer {

private String id;

private Person person;
private PersonDetail personDetail;
private List<PersonAddDetail> personAddDetailList = new ArrayList<>();

public PersonContainer(Person person) {
  this.id = person.getID();
  this.person = person;
}

public PersonContainer(PersonDetail personDetail) {
    this.id = personDetail.getOTTRID();
    this.personDetail = personDetail;
  }

  public PersonContainer(PersonAddDetail personAddDetail) {
    this.id = personAddDetail.getOTTRID();
    this.timeStamp = ttrDetailAddEvent.getDATECREATED();
    this.personAddDetailList.add(personAddDetail);
  }

  public PersonContainer merge(PersonContainer other) {
    if (other.person != null) {
      this.person = other.person;
      return this;
    }
    if (other.personDetail != null) {
      this.personDetail = other.personDetail;
      return this;
    }
    if (other.personAddDetailList.size() > 0) {
      this.personAddDetailList.addAll(other.personAddDetailList);
      return this;
    }
    return null;
  }

  public String getId() {
    return id;
  }

  public Person getPerson() {
    return person;
  }

  public PersonDetail getPersonDetail() {
    return personDetail;
  }

  public List<PersonAddDetail> getPersonAddDetailList() {
    return PersonAddDetailList;
  }

  public boolean isComplete() {
    return person != null && personDetail != null && personAddDetailList.size() > 1;
  }
}

这是很重要的一部分,因为我将首先将三个输入流的对象Map到这个公共对象,然后合并这些流。
下面是我所做的,我在评论中描述了单个步骤。简而言之,我将三个输入流Map到新引入的容器的新流。然后我对这三个流进行联合,并使用迭代模式为这些对象设置键,然后使用我的自定义合并方法合并它们。最后,我定义了一个定制的complete方法来区分最终Map到输出的完全合并容器和反馈到合并过程中的尚未完成的容器。

//Filter PersonAddDetail to have just the types needed
DataStream<PersonContainer> filteredPersonAddDetail = unfilteredPersonAddDetail.filter(new FilterFunction<OboTtrDetailAddEvent>() {
      @Override
      public boolean filter(PersonAddDetail personAddDetail) throws Exception {
        return personAddDetail.getAddDetailType().matches("1|2");
      }
    });

//map Person stream to common object
DataStream<PersonContainer> mappedPersonStream = personInputStream.map(new MapFunction<Person, PersonContainer>() {
  @Override
  public PersonContainer map(Person Person) throws Exception {
    return new PersonContainer(Person);
  }
});

//map PersonDetail stream to common object
DataStream<PersonContainer> mappedPersonDetailStream = personDetailInputStream.map(new MapFunction<PersonDetail, PersonContainer>() {
  @Override
  public PersonContainer map(PersonDetail PersonDetail) throws Exception {
    return new PersonContainer(PersonDetail);
  }
});

//map PersonAddDetail stream to common object
DataStream<PersonContainer> mappedPersonAddDetailStream = filteredPersonAddDetail.map(new MapFunction<PersonAddDetail, PersonContainer>() {
  @Override
  public PersonContainer map(PersonAddDetail PersonAddDetail) throws Exception {
    return new PersonContainer(PersonAddDetail);
  }
});

//union the three input streams to one single stream
DataStream<PersonContainer> combinedInput = mappedPersonStream.union(mappedPersonDetailStream, mappedPersonAddDetailStream);

// Iteration pattern is in place here and I'm going to recursively try to merge corresponding objects togehter
IterativeStream<PersonContainer> iteration = combinedInput.iterate();

// Group objects by there shared ID and then use reduce to merge them
DataStream<PersonContainer> iterationBody = iteration.keyBy(new KeySelector<PersonContainer, String>() {
  @Override
  public String getKey(PersonContainer personContainer) throws Exception {
    return personContainer.getId();
  }
})
    .reduce(new ReduceFunction<PersonContainer>() {
      @Override
      public PersonContainer reduce(PersonContainer personContainer, PersonContainer other) throws Exception {
        return personContainer.merge(other);
      }
    });

// use the containers complete method to check whether the merge is finished or we need to wait for further objects in the stream   
DataStream<PersonContainer> containersNotCompleteYet = iterationBody.filter(new FilterFunction<PersonContainer>() {
  @Override
  public boolean filter(PersonContainer PersonContainer) throws Exception {
    return !personContainer.isComplete();
  }
});

// partially merged or not merged at all containers are put back on the stream
iteration.closeWith(containersNotCompleteYet);

// fully merged containers are processed further
DataStream<PersonContainer> completeContainers = iterationBody.filter(new FilterFunction<PersonContainer>() {
  @Override
  public boolean filter(PersonContainer PersonContainer) throws Exception {
    return personContainer.isComplete();
  }
});

// finally the container is mapped to the correct output object
DataStream<PersonReport> personReport = completeContainers.map(new MapFunction<PersonContainer, PersonReport>() {
  @Override
  public PersonReport map(PersonContainer personContainer) throws Exception {

    // map personContainer to final PersonReport

    return personContainer;
  }
});

这种方法对我很有效,好的是我可以处理流中延迟到达的对象(比如说,personadddetail在其他对象之后的几分钟内到达),并且我不需要定义某种类型的窗口。不管怎样,谢谢你的意见

mcvgt66p

mcvgt66p2#

你的问题听起来像是个问题 join 操作。你可以这样做:

personDataStream.join(personDetailDataStream).where(new KeySelector<Person, Long>() {
    ...
}).equalTo(new KeySelector<PersonDetail, Long>() {
    ...
}).window(TumblingEventTimeWindows.of(Time.seconds(2))).apply(new JoinFunction<Person, PersonDetail, PersonWithDetail>() {
   ...
});

请注意,一般来说,在无界(无限)集合上不可能执行联接操作,因此需要将其绑定到windows中。

相关问题