我正在尝试使用flink streamingfilesink将数据从intellij ide写入azure blob存储,但出现以下错误
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
Caused by: java.lang.ClassNotFoundException: org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/fs/azure/common/hadoop/HadoopRecoverableWriter
下面是我的代码
public class BlobSample {
public static void main(String[] args) throws Exception {
//System.setProperty("hadoop.home.dir", "/");
Configuration cfg = new Configuration();
cfg.setString("fs.azure.account.key.azurekey.blob.core.windows.net",
"azure_blob_key");
//cfg.setBoolean("recursive.file.enumeration", true);
FileSystem.initialize(cfg, null);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.fromElements("hello");
DataStream<String> output = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
// output.writeAsText("wasbs://container@myazure.blob.core.windows.net/output");
String outputPath = "wasbs://container@azurekey.blob.core.windows.net/rawsignals";
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(100)
.build())
.build();
output.addSink(sink);
env.execute("BlobStorage");
}
}
我也尝试过使用writesText,但它也不起作用,我将hadoop\u home添加到我的环境变量中,并在build.gradle还编译组:“org.apache.flink”,名称:“flink azure fs hadoop”,版本:“1.11.2”中添加此依赖项。我将azure密钥添加到flink-conf.yaml中,但它仍然不起作用。请帮我解决这个问题。
暂无答案!
目前还没有任何答案,快来回答吧!