java—使用ApacheFlink从web获取json元素

eulz3vhy  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(310)

在阅读了apache flink的几个文档页面(官方文档,dataartisans)以及官方存储库中提供的示例之后,我不断看到一些示例,它们将这些示例用作简化已下载文件的数据源,并始终连接到localhost。
我正在尝试使用apacheflink下载包含动态数据的json文件。我的意图是尝试建立一个url,在那里我可以访问json文件作为apache flink的输入源,而不是用另一个系统下载它并用apache flink处理下载的文件。
有没有可能与apache flink建立这个网络连接?

bnlyeluc

bnlyeluc1#

您可以定义要下载的url作为输入 DataStream 然后从 MapFunction . 下面的代码演示了这一点:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html");

inputURLs.map(new MapFunction<String, String>() {
    @Override
    public String map(String s) throws Exception {
        URL url = new URL(s);
        InputStream is = url.openStream();

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));

        StringBuilder builder = new StringBuilder();
        String line;

        try {
            while ((line = bufferedReader.readLine()) != null) {
                builder.append(line + "\n");
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        try {
            bufferedReader.close();
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        return builder.toString();
    }
}).print();

env.execute("URL download job");

相关问题