从hbase读取数据流/波束

nhn9ugyo  于 2021-06-08  发布在  Hbase
关注(0)|答案(1)|浏览(360)

我试图理解如何将hbase数据库以10条记录为一批的形式摄取到apachebeam/dataflow中。
到目前为止,我已经尝试了下面的方法,它为每一行提供触发器。

PCollection<KV<Integer,String>> records =  p.apply("read",HBaseIO.read()
                .withConfiguration(conf)
                .withTableId("Data")
                .withScan(scan))
                .apply(ParDo.of(new DoFn<Result, KV<Integer,String>>() {
                    @DoFn.ProcessElement
                    public void process(ProcessContext c){
                        Long ts = Bytes.toLong(c.element().getValue(Bytes.toBytes("timestamp"),Bytes.toBytes("timestamp")));
                        System.out.println(Long.toString(ts));
                        Integer pid = Bytes.toInt(c.element().getValue(Bytes.toBytes("patientid"),Bytes.toBytes("patientid")));
                        c.outputWithTimestamp(KV.of(pid,Long.toString(ts)),new Instant(ts));

                    }
                    @Override
                    public Duration getAllowedTimestampSkew(){
                        return Duration.standardDays(1);
                    }
                }));

但我需要的是将它们按10行顺序排列,然后运行下游管道。
我试着用窗口化将数据分组到窗口中,因为我不断地丢失数据点(不包括在任何窗口中,或者有些窗口即使准备好了数据集也永远无法完成)
请提供您的意见和想法来解决这类问题。
谢谢您。

ars1skjm

ars1skjm1#

apache beam执行模型说:
束流管道通常集中在“令人尴尬的平行”问题上。正因为如此,API强调并行处理元素,这使得很难表达像“为pcollection中的每个元素分配一个序列号”这样的操作。这是有意为之的,因为这样的算法更容易受到可伸缩性问题的影响。
因此,您不应该对pcollections中的排序进行假设。

相关问题