我有一个spark流作业,在其中监听kinesis流,然后将其写入hudi表,我想要做的是说,例如,我将这两条记录添加到hudi表:
| user_id | name | timestamp
| -------- | -------------- |----------
| 1 | name1 |1.1.1
| 2 | name2 |1.1.1
这一点最初应该反映在hudi表中,但是如果我在第二条记录中进行编辑,使其名称为name2_new
,会怎么样
我所期望的是,我将有三个记录,每个记录的时间戳如下:
| user_id | name | timestamp
| -------- | -------------- |----------
| 1 | name1 |1.1.1
| 2 | name2 |1.1.1
| 2 | name2_new |2.2.2
当我在athena中查询这个时,我得到了预期的三个记录,但是如果我想要额外的分析,其中我想要记录的最后版本,它必须是这样的(因为我在记录2中更新了):
| user_id | name | timestamp
| -------- | -------------- |----------
| 1 | name1 |1.1.1
| 2 | name2_new |2.2.2
有什么办法可以让我只得到最新的版本吗?
下面是我用来创建hudi表的glue工作:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
import datetime
from awsglue import DynamicFrame
import boto3
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,
["JOB_NAME", "database_name", "kinesis_table_name", "starting_position_of_kinesis_iterator",
"hudi_table_name", "window_size", "s3_path_hudi", "s3_path_spark"])
spark = SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer').config(
'spark.sql.hive.convertMetastoreParquet', 'false').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
database_name = args["database_name"]
kinesis_table_name = args["kinesis_table_name"]
hudi_table_name = args["hudi_table_name"]
s3_path_hudi = args["s3_path_hudi"]
s3_path_spark = args["s3_path_spark"]
print("***********")
print(f"""
database_name {database_name}
kinesis_table_name = {kinesis_table_name}
hudi_table_name ={hudi_table_name}
s3_path_hudi = {s3_path_hudi}
s3_path_spark = {s3_path_spark}
""")
# can be set to "latest", "trim_horizon" or "earliest"
starting_position_of_kinesis_iterator = args["starting_position_of_kinesis_iterator"]
# The amount of time to spend processing each batch
window_size = args["window_size"]
data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(
database=database_name,
table_name=kinesis_table_name,
transformation_ctx="DataSource0",
additional_options={"inferSchema": "true", "startingPosition": starting_position_of_kinesis_iterator}
)
# config
commonConfig = {
'path': s3_path_hudi
}
hudiWriteConfig = {
'className': 'org.apache.hudi',
'hoodie.table.name': hudi_table_name,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.precombine.field': 'timestamp',
'hoodie.datasource.write.recordkey.field': 'user_id,timestamp',
#'hoodie.datasource.write.partitionpath.field': 'year:SIMPLE,month:SIMPLE,day:SIMPLE',
#'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
#'hoodie.deltastreamer.keygen.timebased.timestamp.type': 'DATE_STRING',
#'hoodie.deltastreamer.keygen.timebased.input.dateformat': 'yyyy-mm-dd',
#'hoodie.deltastreamer.keygen.timebased.output.dateformat': 'yyyy/MM/dd'
}
hudiGlueConfig = {
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
'hoodie.datasource.hive_sync.database': database_name,
'hoodie.datasource.hive_sync.table': hudi_table_name,
'hoodie.datasource.hive_sync.use_jdbc': 'false',
#'hoodie.datasource.write.hive_style_partitioning': 'false',
#'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
#'hoodie.datasource.hive_sync.partition_fields': 'year,month,day'
}
combinedConf = {
**commonConfig,
**hudiWriteConfig,
**hudiGlueConfig
}
# ensure the incomong record has the correct current schema, new fresh columns are fine, if a column exists in current schema but not in incoming record then manually add before inserting
def evolveSchema(kinesis_df, table, forcecast=False):
try:
# get existing table's schema
print("in evolve schema")
kinesis_df.show(truncate=False)
glue_catalog_df = spark.sql("SELECT * FROM " + table + " LIMIT 0")
# sanitize for hudi specific system columns
#columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key',
# '_hoodie_partition_path', '_hoodie_file_name']
columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key',
'_hoodie_file_name']
glue_catalog_df_sanitized = glue_catalog_df.drop(*columns_to_drop)
if (kinesis_df.schema != glue_catalog_df_sanitized.schema):
merged_df = kinesis_df.unionByName(glue_catalog_df_sanitized, allowMissingColumns=True)
return (merged_df)
except Exception as e:
print(e)
return (kinesis_df)
def processBatch(data_frame, batchId):
print("data frame is")
data_frame.show(truncate=False)
schema_main = StructType(
[
StructField('data', StringType(), True),
StructField('metadata', StringType(), True)
]
)
schema_data = StructType(
[
StructField("user_id",IntegerType(),True),
StructField("firstname",StringType(),True),
StructField("lastname",StringType(),True),
StructField("address", StringType(), True),
StructField("email", StringType(), True)
]
)
schema_metadata = StructType(
[
StructField("timestamp", StringType(), True),
StructField("record-type", StringType(), True),
StructField("operation", StringType(), True),
StructField("partition-key-type", StringType(), True),
StructField("schema-name", StringType(), True),
StructField("table-name" , StringType(), True),
StructField("transaction-id", StringType(), True)
]
)
data_frame = data_frame.withColumn("$json$data_infer_schema$_temporary$", from_json("$json$data_infer_schema$_temporary$", schema_main)).select(col("$json$data_infer_schema$_temporary$.*")).\
withColumn("data",from_json("data", schema_data)).\
withColumn("metadata",from_json("metadata", schema_metadata)).select(col("data.*"),col("metadata.timestamp"))
print("data frame is")
data_frame.show(truncate=False)
#column_headers = list(data_frame.columns)
#print("The Column Header :", column_headers)
#data_frame.printSchema()
if (data_frame.count() > 0):
kinesis_dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_kinesis_data_frame")
#print("dynamoic frame is")
#kinesis_dynamic_frame.show(truncate=False)
#print('d')
kinesis_data_frame = kinesis_dynamic_frame.toDF()
print("kinesis is")
kinesis_data_frame.show(truncate=False)
kinesis_data_frame = evolveSchema(kinesis_data_frame, database_name + '.' + hudi_table_name, False)
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(kinesis_data_frame, glueContext, "kinesis_data_frame"),
connection_type="custom.spark",
connection_options=combinedConf
)
glueContext.forEachBatch(
frame=data_frame_DataSource0,
batch_function=processBatch,
options={
"windowSize": window_size,
"checkpointLocation": s3_path_spark
}
)
job.commit()
1条答案
按热度按时间vwoqyblh1#
通常在这类应用程序中,我们会在新记录和现有记录的键匹配时,向上插入表并选择时间戳最大的记录。但是,由于您需要所有更新的历史记录,因此有两种选择来实现您所寻找的目标:
1.通过复制作业接收器(
kinesis -> full_history_table
、kinesis -> last_state_table
)创建包含每条记录最新版本的新表,或使用增量查询(kinesis -> full_history_table -> last_state_table
)在第一个表上创建流(迷你批处理),在这种情况下,您将获得两个查询的结果,而无需聚合数据1.聚合每个查询的表:如果查询不频繁,则可以使用窗口函数动态聚集数据。可以创建视图并对其应用查询: