我使用Delta Standalone library从Delta表中读取数据。我的目标是Assert特定数据由上游函数/服务处理和持久化。
我可以通过简单地检索所有文件来做到这一点,如下所示:
fun retrieveRecords(): List<RowRecord> {
val log = DeltaLog.forTable(configuration, DELTA_TABLE_LOCATION)
val snapshot = log.snapshot()
val iter = snapshot.open()
val rows = mutableListOf<RowRecord>()
while (iter.hasNext()) {
rows.add(iter.next())
}
return rows
}
然而,这并不能扩展。我在中看到scan()
操作看起来很有前途,它可以用于执行以下操作:
使用DeltaScan::getFiles访问与readPredicate的分区过滤器部分匹配的文件。这将返回表中元数据文件的内存优化迭代器。
要进一步筛选非分区列上的返回文件,请使用DeltaScan::getResidualPredicate获取未应用的输入 predicate 部分。
根据我的理解,我将能够使用这个scan()
操作,传递分区列并访问文件(本质上是path
)属性,并从那里进行一些转换。
我挣扎着:
1.在scan()
操作中将分区作为所需的Expression
类型传递;和
1.找到一种方法,然后将检索到的filepath
转换为RowRecords
来执行Assert。
val iter = log.snapshot().scan(
EqualTo(
Column("partitioned_col_1", StringType()),
Literal.of("partition_val_1"),
)
// Scan expects only a single Expression
// Multiple Expressions are not allowed
// ,
// EqualTo(
// Column("partitioned_col_2", StringType()),
// Literal.of("partition_val_2"),
// ),
// EqualTo(
// Column("partitioned_col_3", StringType()),
// Literal.of("partition_val_3"),
// )
).files
val paths = mutableListOf<String>()
while (iter.hasNext()) {
paths.add(iter.next().path)
}
// for path in paths:
// read and convert into RowRecord
我似乎在这里遗漏了几条关键的信息。任何建议都将不胜感激。
1条答案
按热度按时间yuvru6vn1#
你必须使用ClosableParquetDataIterator来迭代扫描的文件。它是内部包。所以你不能访问外部。这里是一个黑客访问它,测试。