如何从apache flink将数据写入azure blob存储?

8gsdolmq  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(326)

我正在尝试使用flink streamingfilesink将数据从intellij ide写入azure blob存储,但出现以下错误

Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
Caused by: java.lang.ClassNotFoundException: org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/fs/azure/common/hadoop/HadoopRecoverableWriter

下面是我的代码

public class BlobSample {

    public static void main(String[] args) throws Exception {

        //System.setProperty("hadoop.home.dir", "/");

        Configuration cfg = new Configuration();
        cfg.setString("fs.azure.account.key.azurekey.blob.core.windows.net",
                "azure_blob_key");
        //cfg.setBoolean("recursive.file.enumeration", true);
        FileSystem.initialize(cfg, null);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> input = env.fromElements("hello");

        DataStream<String> output = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                out.collect(value.concat(" world"));
            }
        });

       // output.writeAsText("wasbs://container@myazure.blob.core.windows.net/output");

        String outputPath = "wasbs://container@azurekey.blob.core.windows.net/rawsignals";

        final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(100)
                                .build())
                .build();

        output.addSink(sink);

        env.execute("BlobStorage");

    }
}

我也尝试过使用writesText,但它也不起作用,我将hadoop\u home添加到我的环境变量中,并在build.gradle还编译组:“org.apache.flink”,名称:“flink azure fs hadoop”,版本:“1.11.2”中添加此依赖项。我将azure密钥添加到flink-conf.yaml中,但它仍然不起作用。请帮我解决这个问题。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题