我试图理解如何将hbase数据库以10条记录为一批的形式摄取到apachebeam/dataflow中。
到目前为止,我已经尝试了下面的方法,它为每一行提供触发器。
PCollection<KV<Integer,String>> records = p.apply("read",HBaseIO.read()
.withConfiguration(conf)
.withTableId("Data")
.withScan(scan))
.apply(ParDo.of(new DoFn<Result, KV<Integer,String>>() {
@DoFn.ProcessElement
public void process(ProcessContext c){
Long ts = Bytes.toLong(c.element().getValue(Bytes.toBytes("timestamp"),Bytes.toBytes("timestamp")));
System.out.println(Long.toString(ts));
Integer pid = Bytes.toInt(c.element().getValue(Bytes.toBytes("patientid"),Bytes.toBytes("patientid")));
c.outputWithTimestamp(KV.of(pid,Long.toString(ts)),new Instant(ts));
}
@Override
public Duration getAllowedTimestampSkew(){
return Duration.standardDays(1);
}
}));
但我需要的是将它们按10行顺序排列,然后运行下游管道。
我试着用窗口化将数据分组到窗口中,因为我不断地丢失数据点(不包括在任何窗口中,或者有些窗口即使准备好了数据集也永远无法完成)
请提供您的意见和想法来解决这类问题。
谢谢您。
1条答案
按热度按时间ars1skjm1#
apache beam执行模型说:
束流管道通常集中在“令人尴尬的平行”问题上。正因为如此,API强调并行处理元素,这使得很难表达像“为pcollection中的每个元素分配一个序列号”这样的操作。这是有意为之的,因为这样的算法更容易受到可伸缩性问题的影响。
因此,您不应该对pcollections中的排序进行假设。