PyFlink错误/异常:“配置单元表不支持使用由节点PythonGroupAggregate生成的更新更改”

2fjabf4q  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(172)

使用Flink 1.13.1和pyFlink以及用户定义的表聚合函数(UDTAGG),将Hive表作为源和接收器,我遇到了一个错误:

pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException:
 Table sink 'myhive.mydb.flink_tmp_model' doesn't support consuming update changes 
 which is produced by node PythonGroupAggregate

这是接收器的SQL CREATE TABLE

table_env.execute_sql(
    """
    CREATE TABLE IF NOT EXISTS flink_tmp_model (
        run_id STRING,
        model_blob BINARY,
        roc_auc FLOAT
    )  PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
        'sink.partition-commit.delay'='1 s',
        'sink.partition-commit.policy.kind'='success-file'
    )
"""
)

这是怎么了?

ddhy6vgd

ddhy6vgd1#

我想您正在执行一个流查询,该查询正在进行某种聚合,需要更新先前发出的结果。parquet/hive接收器不支持这一点--一旦写入结果,它们就是最终结果。
一种解决方案是以批处理模式执行查询。另一种解决方案是使用可以处理更新的接收器(或格式)。或者修改查询,使其只生成最终结果--例如,时间窗口聚合而不是无界聚合。

相关问题