使用Flink 1.13.1和pyFlink以及用户定义的表聚合函数(UDTAGG),将Hive表作为源和接收器,我遇到了一个错误:
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException:
Table sink 'myhive.mydb.flink_tmp_model' doesn't support consuming update changes
which is produced by node PythonGroupAggregate
这是接收器的SQL CREATE TABLE
table_env.execute_sql(
"""
CREATE TABLE IF NOT EXISTS flink_tmp_model (
run_id STRING,
model_blob BINARY,
roc_auc FLOAT
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
'sink.partition-commit.delay'='1 s',
'sink.partition-commit.policy.kind'='success-file'
)
"""
)
这是怎么了?
1条答案
按热度按时间ddhy6vgd1#
我想您正在执行一个流查询,该查询正在进行某种聚合,需要更新先前发出的结果。parquet/hive接收器不支持这一点--一旦写入结果,它们就是最终结果。
一种解决方案是以批处理模式执行查询。另一种解决方案是使用可以处理更新的接收器(或格式)。或者修改查询,使其只生成最终结果--例如,时间窗口聚合而不是无界聚合。