- 问题所在**
我尝试通过flink SQL将hudi表写入minio s3 bucket,但是失败了。hudi表创建好了,但是只包含元数据目录。hoodie目录树如下:
myminio/flink-hudi
└─ t1
└─ .hoodie
├─ .aux
│ ├─ .bootstrap
│ │ ├─ .fileids
│ │ └─ .partitions
│ └─ ckp_meta
├─ .schema
├─ .temp
└─ archived
- 为了繁衍**
重现该行为的步骤:
1.创建Flink Hudi表
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 's3a://flink-hudi/t1',
'table.type' = 'MERGE_ON_READ'
);
1.将数据插入Hudi表
INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');
- 环境说明**
- 胡迪版本:0.12.0
- Hadoop版本:3.2.4
- Flink 版本:1.15.2
- 存储(HDFS/S3/GCS ..):迷你S3
- 是否在Docker上运行?(是/否):没有
- 其他背景**
添加的依赖项:
- hadoop-aws-3.2.4.jar
- aws-java-sdk-bundle-1.11.901.jar
- flink-s3-fs-hadoop-1.15.2.jar
hadoop核心站点. xml中的属性:
<property>
<name>fs.s3a.access.key</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
flink-conf.yaml:
taskmanager.numberOfTaskSlots: 4
s3a.endpoint: xxx
s3a.access-key: xxx
s3a.secret-key: xxx
s3a.path.style.access: true
fs.hdfs.hadoopconf: /export/servers/hadoop-3.2.4/etc/hadoop
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: s3a://flink-state/checkpoint
execution.checkpointing.interval: 30000
classloader.check-leaked-classloader: false
执行 Flink :
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/start-cluster.sh
./bin/sql-client.sh embedded -j /opt/flink/jars/hudi-flink1.15-bundle-0.12.0.jar shell
- 堆栈跟踪**
org.apache.hudi.exception.HoodieException: Exception while scanning the checkpoint meta files under path: s3a://flink-hudi/t1/.hoodie/.aux/ckp_meta
at org.apache.hudi.sink.meta.CkpMetadata.load(CkpMetadata.java:169)
at org.apache.hudi.sink.meta.CkpMetadata.lastPendingInstant(CkpMetadata.java:175)
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.lastPendingInstant(AbstractStreamWriteFunction.java:243)
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.initializeState(AbstractStreamWriteFunction.java:151)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://flink-hudi/t1/.hoodie/.aux/ckp_meta
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2344)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2226)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2160)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerListStatus(S3AFileSystem.java:1961)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listStatus$9(S3AFileSystem.java:1940)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:1940)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$15(HoodieWrapperFileSystem.java:365)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:106)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:364)
at org.apache.hudi.sink.meta.CkpMetadata.scanCkpMetadata(CkpMetadata.java:216)
at org.apache.hudi.sink.meta.CkpMetadata.load(CkpMetadata.java:167)
... 18 more
- 预期行为**
写入hudi表到s3桶成功.
1条答案
按热度按时间mo49yndu1#
根据您的目录树,表是在路径
s3a://flink-hudi/
中创建的,而不是在t1
中,因此当您尝试插入数据时,Hudi没有在正确的位置找到元数据。