在flink或任何其他系统中合并两种不同类型的数据流

fjaof16o  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(666)

我想将flink用于远程患者监控案例场景,其中包括各种传感器,如陀螺仪、加速计、ecg流、hr率流、rr率等。因此在这种场景中,我们不可能有相同的数据类型或输入率等,但我仍然想检测心律失常或其他医疗状况,包括在这些多个传感器上做cep
我知道的是,如果我想对这些传感器执行一些复杂的事件处理,那么我有两个选项需要在cep之前完成
加入差异流
合并差异流
前面我正在执行一个基于传感器时间戳的连接,但是它不会导致连接所有事件,因为diff流可以有不同的速率和不同的时间戳(以微秒为单位),因此时间戳完全相等的情况将是罕见的。
因此,我想使用选项2,即在执行cep之前执行合并。为此,我在flink文档中发现,我可以合并这两个流,但它们应该具有相同的数据类型,我尝试了相同的操作,但没有成功,因为我遇到了以下错误

Exception in thread "main" java.lang.IllegalArgumentException: Cannot union streams of different types: GenericType<org.carleton.cep.monitoring.latest.Events.RRIntervalStreamEvent> and GenericType<org.carleton.cep.monitoring.latest.Events.qrsIntervalStreamEvent>
    at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:217)

现在让我们看看我是如何尝试执行合并的。基本上我有两个流类,它们的属性如下
rIntervalStreamEvent流

public Integer Sensor_id;
public Long time;
public Integer RRInterval;

qrsintervalstreamevent流

public Integer Sensor_id;
public Long time;
public Integer qrsInterval;

这两个流都有Generator类,它也以指定的速率发送相同数据类型中的事件。下面是我尝试合并它们的代码。

// getting qrs interval stream
   DataStream<qrsIntervalStreamEvent> qrs_stream_raw = envrionment.
                    addSource(new Qrs_interval_Gen(input_rate_qrs_S,Total_Number_Of_Events_in_qrs)).name("qrs stream");

// getting RR interval stream
 DataStream<RRIntervalStreamEvent> rr_stream_raw = envrionment.
                         addSource(new RR_interval_Gen(input_rate_rr_S,Total_Number_Of_Events_in_RR)).name("RR stream");

//merging both streams
 DataStream<Tuple3<Integer,Long,Integer>> mergedStream;
            mergedStream = rr_stream_raw.union(new DataStream[]{qrs_stream_raw});

我必须使用 new DataStream[] 仅仅是使用 qrs_stream_raw 导致如下所示的错误。

有人能告诉我一个关于
我应该如何合并这两条流?
我应该如何合并两个以上的流?
是否有一个引擎可以合并两个以上具有不同结构的流,如果有,我应该使用哪个引擎

dzjeubhm

dzjeubhm1#

正如alex所指出的,我们可以使用两个流的相同数据类型,并可以将它们连接到flink中,另一种选择是使用siddhi或flink-siddhi扩展。但我只想在Flink做任何事
下面是我在我的程序中做的一些改变
第1步:使两个生成器类都返回公共类型

public class RR_interval_Gen extends RichParallelSourceFunction<Tuple3<Integer,Long, Integer>>

第二步:使两个流生成器都具有元组类型,然后合并两个流。

// getting qrs interval stream
    DataStream<Tuple3<Integer,Long,Integer>> qrs_stream_raw = envrionment.
            addSource(new Qrs_interval_Gen(input_rate_qrs_S,Total_Number_Of_Events_in_qrs)).name("qrs stream");

    // getting RR interval stream
         DataStream<Tuple3<Integer,Long,Integer>> rr_stream_raw = envrionment.
                 addSource(new RR_interval_Gen(input_rate_rr_S,Total_Number_Of_Events_in_RR)).name("RR stream");

         //merging both streams
    DataStream<Tuple3<Integer,Long,Integer>> mergedStream = rr_stream_raw.union(qrs_stream_raw);

相关问题