通过flink SQL将hudi表写入minio s3存储桶时出错

h43kikqp  于 2023-01-03  发布在  Apache
关注(0)|答案(1)|浏览(901)
    • 问题所在**

我尝试通过flink SQL将hudi表写入minio s3 bucket,但是失败了。hudi表创建好了,但是只包含元数据目录。hoodie目录树如下:

myminio/flink-hudi
└─ t1
└─ .hoodie
├─ .aux
│  ├─ .bootstrap
│  │  ├─ .fileids
│  │  └─ .partitions
│  └─ ckp_meta
├─ .schema
├─ .temp
└─ archived
    • 为了繁衍**

重现该行为的步骤:
1.创建Flink Hudi表

CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 's3a://flink-hudi/t1',
'table.type' = 'MERGE_ON_READ'
);

1.将数据插入Hudi表

INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');
    • 环境说明**
  • 胡迪版本:0.12.0
  • Hadoop版本:3.2.4
  • Flink 版本:1.15.2
  • 存储(HDFS/S3/GCS ..):迷你S3
  • 是否在Docker上运行?(是/否):没有
    • 其他背景**

添加的依赖项:

  • hadoop-aws-3.2.4.jar
  • aws-java-sdk-bundle-1.11.901.jar
  • flink-s3-fs-hadoop-1.15.2.jar

hadoop核心站点. xml中的属性:

<property>
  <name>fs.s3a.access.key</name>
  <value>xxx</value>
</property>
<property>
  <name>fs.s3a.secret.key</name>
  <value>xxx</value>
</property>
<property>
  <name>fs.s3a.endpoint</name>
  <value>xxx</value>
</property>
<property>
  <name>fs.s3a.path.style.access</name>
  <value>true</value>
</property>
<property>
  <name>fs.s3.impl</name>
  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>

flink-conf.yaml:

taskmanager.numberOfTaskSlots: 4

s3a.endpoint: xxx
s3a.access-key: xxx
s3a.secret-key: xxx
s3a.path.style.access: true

fs.hdfs.hadoopconf: /export/servers/hadoop-3.2.4/etc/hadoop

state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: s3a://flink-state/checkpoint
execution.checkpointing.interval: 30000

classloader.check-leaked-classloader: false

执行 Flink :

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/start-cluster.sh
./bin/sql-client.sh embedded -j /opt/flink/jars/hudi-flink1.15-bundle-0.12.0.jar shell
    • 堆栈跟踪**
org.apache.hudi.exception.HoodieException: Exception while scanning the checkpoint meta files under path: s3a://flink-hudi/t1/.hoodie/.aux/ckp_meta
at org.apache.hudi.sink.meta.CkpMetadata.load(CkpMetadata.java:169)
at org.apache.hudi.sink.meta.CkpMetadata.lastPendingInstant(CkpMetadata.java:175)
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.lastPendingInstant(AbstractStreamWriteFunction.java:243)
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.initializeState(AbstractStreamWriteFunction.java:151)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://flink-hudi/t1/.hoodie/.aux/ckp_meta
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2344)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2226)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2160)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerListStatus(S3AFileSystem.java:1961)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listStatus$9(S3AFileSystem.java:1940)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:1940)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$15(HoodieWrapperFileSystem.java:365)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:106)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:364)
at org.apache.hudi.sink.meta.CkpMetadata.scanCkpMetadata(CkpMetadata.java:216)
at org.apache.hudi.sink.meta.CkpMetadata.load(CkpMetadata.java:167)
... 18 more
    • 预期行为**

写入hudi表到s3桶成功.

mo49yndu

mo49yndu1#

根据您的目录树,表是在路径s3a://flink-hudi/中创建的,而不是在t1中,因此当您尝试插入数据时,Hudi没有在正确的位置找到元数据。

CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 's3a://flink-hudi/t1/',
'table.type' = 'MERGE_ON_READ'
);

相关问题