我的源函数有一个频率控制,而我可以调整数据速率,它将数据刷新到下一个操作符。我用普罗米修斯+格拉法纳测量每个操作员的数据速率。然后我开始以每秒100次的速度生成数据。在grafana Jmeter 板上显示大约90 rec/sec。然后我将数据速率提高到200 rec/sec。然而,grafana Jmeter 板实际上显示12 rec/sec。我在想象背压在保存数据。但Flink Jmeter 盘并没有显示我有背压。
所以,当你查到Flink密码的时候 StreamSourceContexts.collect(T element)
那里有一个同步块。我想这是为了确保事件的有序性。但是,如果我打电话给 StreamSourceContexts.collect(T element)
在我的sourcefunction中使用未来?我是不是会在事件中经历混乱?是否有一个源函数允许我以异步方式推送事件?
@Override
public void collect(T element) {
synchronized (lock) {
output.collect(reuse.replace(element));
}
}
我的源函数:
public class OrdersSource extends RichSourceFunction<Order> {
@Override
public void run(SourceContext<Order> sourceContext) {
try {
while (running) {
generateOrderItem(sourceContext);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void generateOrderItem(SourceContext<Order> sourceContext) {
try {
InputStream stream = new FileInputStream(dataFilePath);
BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
long startTime = System.nanoTime();
String line = reader.readLine();
while (line != null) {
// I would like to put an async thread here
// Thread newThread = new Thread(() -> {
// sourceContext.collect(getOrderItem(line));
// });
// newThread.start();
sourceContext.collect(getOrderItem(line));
// sleep in nanoseconds to have a reproducible data rate for the data source
this.dataRateListener.busySleep(startTime);
// get start time and line for the next iteration
startTime = System.nanoTime();
line = reader.readLine();
}
reader.close();
reader = null;
stream.close();
stream = null;
} catch (FileNotFoundException e) {
System.err.println("Please make sure they are available at [" + dataFilePath + "].");
System.err.println(
" Follow the instructions at [https://docs.deistercloud.com/content/Databases.30/TPCH%20Benchmark.90/Data%20generation%20tool.30.xml?embedded=true] in order to download and create them.");
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!