选择flink/kafka应用程序的延迟图

pkln4tw6  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(355)

我有一个应用程序,它接收来自kafka主题的tweet,有一个1秒的窗口,然后通过一个assyncio操作将这些tweet保存在cassandra上,该操作允许打开最多100个线程(asyncio操作符的最后一个参数),而无需对数据进行任何预处理:只需按tweet保存tweet,时间戳为保存时。
然后,我强调了flink应用程序发送了300万条tweet,并在grafana中绘制了一个图表,显示了数据库中保存了多少条tweet,但这个图表显示了一些选择,并不是一条连续的线,我不明白为什么。
所以你可以看到,在一分钟的时间间隔内,它可以节省7k,然后再到5k,再到2k。如果能帮我找出原因,我将不胜感激!谢谢!

o0lyfsai

o0lyfsai1#

首先,如果你想给Cassandra写信,我会用连接器。如果不是近乎不可能的话,正确地手动实现一次这样的操作是非常困难的。
其次,asyncio没有启动100个线程。实际上,它并没有为用户启动任何线程。你需要通过任何方式自己开始。通常,它使用外部系统的回调机制,其中库有自己的连接池。
如果要进行同步调用,则需要管理自己的线程池。我建议使用 Executors.newCachedThreadPool() 并将异步任务提交给它。asyncio只会帮助将异步结果合并回同步流。
第三,100个线程可能相当多,这取决于您的设置。还要注意,如果使用flink的scale-up(每个taskmanager使用多个插槽),那么使用的线程将成倍增加。

相关问题