java—flink有异步源函数吗?

eanckbw9  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(288)

我的源函数有一个频率控制,而我可以调整数据速率,它将数据刷新到下一个操作符。我用普罗米修斯+格拉法纳测量每个操作员的数据速率。然后我开始以每秒100次的速度生成数据。在grafana Jmeter 板上显示大约90 rec/sec。然后我将数据速率提高到200 rec/sec。然而,grafana Jmeter 板实际上显示12 rec/sec。我在想象背压在保存数据。但Flink Jmeter 盘并没有显示我有背压。
所以,当你查到Flink密码的时候 StreamSourceContexts.collect(T element) 那里有一个同步块。我想这是为了确保事件的有序性。但是,如果我打电话给 StreamSourceContexts.collect(T element) 在我的sourcefunction中使用未来?我是不是会在事件中经历混乱?是否有一个源函数允许我以异步方式推送事件?

@Override
    public void collect(T element) {
        synchronized (lock) {
            output.collect(reuse.replace(element));
        }
    }

我的源函数:

public class OrdersSource extends RichSourceFunction<Order> {
    @Override
    public void run(SourceContext<Order> sourceContext) {
        try {
            while (running) {
                generateOrderItem(sourceContext);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void generateOrderItem(SourceContext<Order> sourceContext) {
        try {
            InputStream stream = new FileInputStream(dataFilePath);
            BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));

            long startTime = System.nanoTime();
            String line = reader.readLine();
            while (line != null) {
                // I would like to put an async thread here
                // Thread newThread = new Thread(() -> {
                //     sourceContext.collect(getOrderItem(line));
                // });
                // newThread.start();
                sourceContext.collect(getOrderItem(line));

                // sleep in nanoseconds to have a reproducible data rate for the data source
                this.dataRateListener.busySleep(startTime);

                // get start time and line for the next iteration
                startTime = System.nanoTime();
                line = reader.readLine();
            }
            reader.close();
            reader = null;
            stream.close();
            stream = null;
        } catch (FileNotFoundException e) {
            System.err.println("Please make sure they are available at [" + dataFilePath + "].");
            System.err.println(
                    " Follow the instructions at [https://docs.deistercloud.com/content/Databases.30/TPCH%20Benchmark.90/Data%20generation%20tool.30.xml?embedded=true] in order to download and create them.");
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题