在使用flink提供的原始kmeans聚类示例中,每个点在每次迭代中都被分配到一个新的质心,而关于该点被分配到哪个质心的信息不会被带入下一次迭代。我的目标是将这些信息带到下一个迭代中。
我尝试的第一个解决方案是在循环之前将每个点分配给一个id为0的不存在的质心,然后在迭代中更新这个数据集。这是我在常规循环中所采用的方法,但是我现在意识到在flink中使用迭代特性与使用常规循环并不完全相同。代码如下所示。
DataSet<Tuple2<Integer, Point>> clusteredPoints = nullClusteredPoints;
IterativeDataSet<Centroid> loop = centroids.iterate(iterations);
// Asssigning each point to the nearest centroid
clusteredPoints = clusteredPoints
// compute closest centroid for each point
.map(new SelectNearestCenter())
.withBroadcastSet(loop, "centroids");
DataSet<Centroid> newCentroids = clusteredPoints
// count and sum point coordinates for each centroid
.map(new CountAppender())
.groupBy(0).reduce(new CentroidAccumulator())
// compute new centroids from point counts and coordinate sums
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
我的期望是数据集 clusteredPoints
将在每次迭代中使用,然后在最后一次迭代之后,此数据集将由最终的聚集点组成。但是,当尝试执行此操作时,会发生以下异常。
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?
我尝试的另一个解决方案是使用delta迭代,并将点的数据集放在解决方案集中,以便提供给下一个迭代。根据下面的异常,这也不起作用,因为解决方案集上唯一允许的操作是join和cogroup。
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Error: The only operations allowed on the solution set are Join and CoGroup.
我尝试的第三种解决方案是在每次迭代开始时从磁盘读取点的数据集,并在迭代结束时将它们写入磁盘(可能效率很低)。但是,写入磁盘会返回一个datasink,因此上面显示的第一个异常也会出现在这个解决方案中。
我可以尝试更好的解决方案吗?或者flink迭代不支持这样的用例?
1条答案
按热度按时间1tuwyuhd1#
flink的迭代目前只支持一个移动的数据集,因此它获得了很好的运行时属性,所有静态数据都保存在内存中,移动的数据集通过流传输。从理论上讲,flink可以支持更多,但在很多情况下,这些好的属性无法保持。
在您的例子中,您可以通过将两个数据集合并为一个来解决这个问题
centroidWithPoints
=clusters
,其中,对于每个质心,还存储点列表。或者,您可以使用一个带标记的联合,将两个数据集合并为一个数据集,然后在下一次迭代开始时将其拆分。