kotlin Delta Standalone -扫描特定数据

14ifxucb  于 2023-04-21  发布在  Kotlin
关注(0)|答案(1)|浏览(124)

我使用Delta Standalone library从Delta表中读取数据。我的目标是Assert特定数据由上游函数/服务处理和持久化。
我可以通过简单地检索所有文件来做到这一点,如下所示:

fun retrieveRecords(): List<RowRecord> {
    val log = DeltaLog.forTable(configuration, DELTA_TABLE_LOCATION)
    val snapshot = log.snapshot()
    val iter = snapshot.open()
    val rows = mutableListOf<RowRecord>()
    while (iter.hasNext()) {
        rows.add(iter.next())
    }
    return rows
}

然而,这并不能扩展。我在中看到scan()操作看起来很有前途,它可以用于执行以下操作:
使用DeltaScan::getFiles访问与readPredicate的分区过滤器部分匹配的文件。这将返回表中元数据文件的内存优化迭代器。
要进一步筛选非分区列上的返回文件,请使用DeltaScan::getResidualPredicate获取未应用的输入 predicate 部分。
根据我的理解,我将能够使用这个scan()操作,传递分区列并访问文件(本质上是path)属性,并从那里进行一些转换。
我挣扎着:
1.在scan()操作中将分区作为所需的Expression类型传递;和
1.找到一种方法,然后将检索到的filepath转换为RowRecords来执行Assert。

val iter = log.snapshot().scan(
    EqualTo(
        Column("partitioned_col_1", StringType()),
        Literal.of("partition_val_1"),
    )
    // Scan expects only a single Expression
    // Multiple Expressions are not allowed
    // ,
    // EqualTo(
    //    Column("partitioned_col_2", StringType()),
    //    Literal.of("partition_val_2"),
    // ),
    // EqualTo(
    //    Column("partitioned_col_3", StringType()),
    //    Literal.of("partition_val_3"),
    // )
).files

val paths = mutableListOf<String>()
while (iter.hasNext()) {
    paths.add(iter.next().path)
}

// for path in paths:
//     read and convert into RowRecord

我似乎在这里遗漏了几条关键的信息。任何建议都将不胜感激。

yuvru6vn

yuvru6vn1#

你必须使用ClosableParquetDataIterator来迭代扫描的文件。它是内部包。所以你不能访问外部。这里是一个黑客访问它,测试。

import io.delta.standalone.DeltaLog;
import io.delta.standalone.DeltaScan;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.expressions.And;
import io.delta.standalone.expressions.EqualTo;
import io.delta.standalone.expressions.Literal;
import io.delta.standalone.internal.util.{ConversionUtils, FileNames, JsonUtils}
import io.delta.standalone.internal.data.CloseableParquetDataIterator
    val deltaLog = DeltaLog.forTable(configuration, DELTA_TABLE_LOCATION)
    val snapshot = deltaLog.snapshot()
import scala.collection.JavaConverters;

var allFiles = snapshot.scan(new EqualTo(snapshot.getMetadata().getSchema.column("partitioned_col_1"), Literal.of(partition_val_1))).getFiles

import scala.collection.JavaConverters._;
    
var rowIterator = Class.forName("io.delta.standalone.internal.data.CloseableParquetDataIterator").getConstructor(classOf[Seq[(String, Map[String, String])]], snapshot.getMetadata.getSchema.getClass, classOf[java.util.TimeZone], classOf[org.apache.hadoop.conf.Configuration]).newInstance(JavaConverters.asScalaIteratorConverter(allFiles).asScala.toSeq.map {add => (DELTA_TABLE_LOCATION + "/" + add.getPath , add.getPartitionValues.asScala.toMap)}, snapshot.getMetadata.getSchema, null, new org.apache.hadoop.conf.Configuration()).asInstanceOf[CloseableIterator[io.delta.standalone.data.RowRecord]]
// your result here    
rowIterator.next

相关问题