Apache Spark 如何获取hudi表的最新版本

bq9c1y66  于 2023-02-09  发布在  Apache
关注(0)|答案(1)|浏览(195)

我有一个spark流作业,在其中监听kinesis流,然后将其写入hudi表,我想要做的是说,例如,我将这两条记录添加到hudi表:

| user_id  | name           | timestamp
| -------- | -------------- |----------
| 1        | name1          |1.1.1
| 2        | name2          |1.1.1

这一点最初应该反映在hudi表中,但是如果我在第二条记录中进行编辑,使其名称为name2_new,会怎么样
我所期望的是,我将有三个记录,每个记录的时间戳如下:

| user_id  | name           | timestamp
| -------- | -------------- |----------
| 1        | name1          |1.1.1
| 2        | name2          |1.1.1
| 2        | name2_new      |2.2.2

当我在athena中查询这个时,我得到了预期的三个记录,但是如果我想要额外的分析,其中我想要记录的最后版本,它必须是这样的(因为我在记录2中更新了):

| user_id  | name           | timestamp
| -------- | -------------- |----------
| 1        | name1          |1.1.1
| 2        | name2_new      |2.2.2

有什么办法可以让我只得到最新的版本吗?
下面是我用来创建hudi表的glue工作:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
import datetime
from awsglue import DynamicFrame

import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,
                          ["JOB_NAME", "database_name", "kinesis_table_name", "starting_position_of_kinesis_iterator",
                           "hudi_table_name", "window_size", "s3_path_hudi", "s3_path_spark"])

spark = SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer').config(
    'spark.sql.hive.convertMetastoreParquet', 'false').getOrCreate()

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

database_name = args["database_name"]
kinesis_table_name = args["kinesis_table_name"]
hudi_table_name = args["hudi_table_name"]
s3_path_hudi = args["s3_path_hudi"]
s3_path_spark = args["s3_path_spark"]

print("***********")
print(f"""
database_name  {database_name}
kinesis_table_name = {kinesis_table_name}
hudi_table_name ={hudi_table_name}
s3_path_hudi = {s3_path_hudi}
s3_path_spark = {s3_path_spark}
""")
# can be set to "latest", "trim_horizon" or "earliest"
starting_position_of_kinesis_iterator = args["starting_position_of_kinesis_iterator"]

# The amount of time to spend processing each batch
window_size = args["window_size"]

data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(
    database=database_name,
    table_name=kinesis_table_name,
    transformation_ctx="DataSource0",
    additional_options={"inferSchema": "true", "startingPosition": starting_position_of_kinesis_iterator}
)

# config
commonConfig = {
    'path': s3_path_hudi
}

hudiWriteConfig = {
    'className': 'org.apache.hudi',
    'hoodie.table.name': hudi_table_name,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
    'hoodie.datasource.write.precombine.field': 'timestamp',
    'hoodie.datasource.write.recordkey.field': 'user_id,timestamp',
    #'hoodie.datasource.write.partitionpath.field': 'year:SIMPLE,month:SIMPLE,day:SIMPLE',
    #'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
    #'hoodie.deltastreamer.keygen.timebased.timestamp.type': 'DATE_STRING',
    #'hoodie.deltastreamer.keygen.timebased.input.dateformat': 'yyyy-mm-dd',
    #'hoodie.deltastreamer.keygen.timebased.output.dateformat': 'yyyy/MM/dd'
}

hudiGlueConfig = {
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': database_name,
    'hoodie.datasource.hive_sync.table': hudi_table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    #'hoodie.datasource.write.hive_style_partitioning': 'false',
    #'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    #'hoodie.datasource.hive_sync.partition_fields': 'year,month,day'
}

combinedConf = {
    **commonConfig,
    **hudiWriteConfig,
    **hudiGlueConfig
}

# ensure the incomong record has the correct current schema, new fresh columns are fine, if a column exists in current schema but not in incoming record then manually add before inserting
def evolveSchema(kinesis_df, table, forcecast=False):
    try:
        # get existing table's schema
        print("in evolve schema")
        kinesis_df.show(truncate=False)
        
        glue_catalog_df = spark.sql("SELECT * FROM " + table + " LIMIT 0")
        # sanitize for hudi specific system columns
        #columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key',
        #                   '_hoodie_partition_path', '_hoodie_file_name']
        
        columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key',
                            '_hoodie_file_name']
        glue_catalog_df_sanitized = glue_catalog_df.drop(*columns_to_drop)
        if (kinesis_df.schema != glue_catalog_df_sanitized.schema):
            merged_df = kinesis_df.unionByName(glue_catalog_df_sanitized, allowMissingColumns=True)
        return (merged_df)
    except Exception as e:
        print(e)
        return (kinesis_df)

def processBatch(data_frame, batchId):
    
    print("data frame is")
    data_frame.show(truncate=False)
    
    schema_main = StructType(
    [
        StructField('data', StringType(), True),
        StructField('metadata', StringType(), True)
    ]
    )

    schema_data = StructType(
    [
    StructField("user_id",IntegerType(),True), 
    StructField("firstname",StringType(),True), 
    StructField("lastname",StringType(),True), 
    StructField("address", StringType(), True), 
    StructField("email", StringType(), True)
    ]
    )

    schema_metadata = StructType(
    [
        StructField("timestamp", StringType(), True),
        StructField("record-type", StringType(), True),
        StructField("operation", StringType(), True),
        StructField("partition-key-type", StringType(), True),
        StructField("schema-name", StringType(), True),
        StructField("table-name"   , StringType(), True),
        StructField("transaction-id", StringType(), True)
    ]
    )

    data_frame = data_frame.withColumn("$json$data_infer_schema$_temporary$", from_json("$json$data_infer_schema$_temporary$", schema_main)).select(col("$json$data_infer_schema$_temporary$.*")).\
    withColumn("data",from_json("data", schema_data)).\
    withColumn("metadata",from_json("metadata", schema_metadata)).select(col("data.*"),col("metadata.timestamp"))
    
    print("data frame is")
    data_frame.show(truncate=False)
    
    #column_headers = list(data_frame.columns)
    #print("The Column Header :", column_headers)
    #data_frame.printSchema()
    
    if (data_frame.count() > 0):
        kinesis_dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_kinesis_data_frame")
        #print("dynamoic frame is")
        #kinesis_dynamic_frame.show(truncate=False)
        #print('d')
        kinesis_data_frame = kinesis_dynamic_frame.toDF()
        print("kinesis is")
        kinesis_data_frame.show(truncate=False)
        
        kinesis_data_frame = evolveSchema(kinesis_data_frame, database_name + '.' + hudi_table_name, False)
       
        glueContext.write_dynamic_frame.from_options(
            frame=DynamicFrame.fromDF(kinesis_data_frame, glueContext, "kinesis_data_frame"),
            connection_type="custom.spark",
            connection_options=combinedConf
        )

glueContext.forEachBatch(
    frame=data_frame_DataSource0,
    batch_function=processBatch,
    options={
        "windowSize": window_size,
        "checkpointLocation": s3_path_spark
    }
)

job.commit()
vwoqyblh

vwoqyblh1#

通常在这类应用程序中,我们会在新记录和现有记录的键匹配时,向上插入表并选择时间戳最大的记录。但是,由于您需要所有更新的历史记录,因此有两种选择来实现您所寻找的目标:
1.通过复制作业接收器(kinesis -> full_history_tablekinesis -> last_state_table)创建包含每条记录最新版本的新表,或使用增量查询(kinesis -> full_history_table -> last_state_table)在第一个表上创建流(迷你批处理),在这种情况下,您将获得两个查询的结果,而无需聚合数据
1.聚合每个查询的表:如果查询不频繁,则可以使用窗口函数动态聚集数据。可以创建视图并对其应用查询:

SELECT user_id, name, timestamp
FROM (
    SELECT t.*,
        ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY timestamp DESC) AS rn
    FROM TABLE_NAME t
) t
WHERE rn=1;

相关问题