我有一个用flink构建的工作流,它由一个自定义源、一系列Map/平面Map和一个接收器组成。
自定义源代码的run()方法遍历存储在文件夹中的文件,并通过上下文的collect()方法收集每个文件的名称和内容(我有一个自定义对象,它将此信息存储在两个字段中)。
然后,我有一系列的Map/平面图转换这样的对象,然后打印成文件使用自定义Flume。在flink的web ui中生成的执行图如下所示:
我有一个集群或两个worker,每个都有6个插槽(它们都有6个内核)。我把平行度设为12。从执行图中,我看到源代码的并行度为1,而工作流的其余部分的并行度为12。
当我运行工作流(在专用文件夹中有大约15k个文件)时,我使用htop监视我的工作人员的资源。所有的核心在大部分时间内都达到100%的利用率,但大约每30分钟左右,8-10个核心就会闲置2-3分钟。
我的问题如下:
我知道源代码运行的并行度为1,我认为从本地存储读取时这是正常的(我的文件位于每个工作进程中的同一目录中,因为我不知道将选择哪个工作进程来执行源代码)。真的正常吗?你能解释一下为什么会这样吗?
我的工作流程的其余部分是用parallelism 12来执行的,它看起来是正确的,因为通过检查任务管理器的日志,我从所有的槽(例如。, .... [Flat Map -> Map -> Map -> Sink: Unnamed (**3/12**)] INFO ....
, .... [Flat Map -> Map -> Map -> Sink: Unnamed (**5/12**)] INFO ....
等)。但我不明白的是,如果一个插槽正在执行源角色,而我的集群中有12个插槽,那么工作流的其余部分是如何由12个插槽执行的?一个槽是否同时作用于源和工作流其余部分的一个示例?如果是,如何分配此特定插槽的资源?有人能解释一下这个工作流程中的步骤吗?例如(这可能是错误的):
插槽1读取文件并将其转发到可用插槽(2到12)
插槽1将一个文件转发给它自己并停止读取,直到它完成它的工作
完成后,插槽1读取更多文件并将其转发到可用的插槽
我相信我上面所描述的是错误的,但我举一个例子来更好地解释我的问题
为什么大多数内核的空闲状态是每30分钟(或多或少)一次,持续3分钟左右?
2条答案
按热度按时间yshpjwxd1#
单一使用者设置将管道的总吞吐量限制为仅一个使用者的性能。此外,它还将重洗牌引入所有插槽—在本例中,使用者读取的所有数据也在该使用者插槽上序列化,这是一个额外的cpu负载。相反,将使用者并行性设置为map/flat map parallelsm将允许链接source->map操作并避免洗牌。
默认情况下,flink允许子任务共享槽,即使它们是不同任务的子任务,只要它们来自同一个作业。结果是一个槽可能容纳作业的整个管道。因此,在您的示例中,插槽1同时具有consumer和map/flat map任务,而其他插槽仅具有map/flat map任务。有关更多详细信息,请参见此处:https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#task-插槽和资源。此外,您还可以实际查看webui上每个子任务的示例。
是否启用了检查点?如果是的,如果是30分钟,那么这可能就是状态快照的时间间隔。
ljo96ir52#
为了回答关于并行化你的阅读的具体问题,我将做以下。。。
通过扩展
RichSourceFunction
.在你的
open()
方法,调用getRuntimeContext().getNumberOfParallelSubtasks()
获取总并行度并调用getRuntimeContext().getIndexOfThisSubtask()
获取正在初始化的子任务的索引。在你的
run()
方法,在遍历文件时,获取hashCode()
将每个文件名的总并行度模化。如果这等于子任务的索引,则处理它。通过这种方式,您可以将工作分散到12个子任务上,而无需子任务尝试处理同一个文件。