gpt4 book ai didi

java - 在 Apache Flink 中将多个数据集传输到下一次迭代

转载 作者:行者123 更新时间:2023-12-01 18:06:10 25 4
gpt4 key购买 nike

在 Flink 提供的原始 Kmeans 聚类示例中,每个点在每次迭代中都会分配给一个新的质心,并且有关该点分配给哪个质心的信息不会保留到下一次迭代中。我的目标是将这些信息带到下一次迭代。

我尝试的第一个解决方案是在循环之前将每个点分配给一个 ID 为 0 的不存在的质心,然后在迭代过程中更新此数据集。这是我在常规循环中完成的方式,但是我现在意识到在 Flink 中使用迭代功能与使用常规循环并不完全相同。其代码如下所示。

DataSet<Tuple2<Integer, Point>> clusteredPoints = nullClusteredPoints;

IterativeDataSet<Centroid> loop = centroids.iterate(iterations);

// Asssigning each point to the nearest centroid
clusteredPoints = clusteredPoints
// compute closest centroid for each point
.map(new SelectNearestCenter())
.withBroadcastSet(loop, "centroids");

DataSet<Centroid> newCentroids = clusteredPoints
// count and sum point coordinates for each centroid
.map(new CountAppender())
.groupBy(0).reduce(new CentroidAccumulator())
// compute new centroids from point counts and coordinate sums
.map(new CentroidAverager());

// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);

我的期望是在每次迭代中使用数据集clusteredPoints,然后在最后一次迭代之后该数据集将包含最终的聚类点。但是,当尝试执行此操作时,会发生以下异常。

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

我尝试的另一个解决方案是使用增量迭代并将点的数据集放入解决方案集中,以便提供给下一次迭代。这也不起作用,因为根据下面的异常(exception)情况,解决方案集上允许的唯一操作是 Join 和 CoGroup。

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Error: The only operations allowed on the solution set are Join and CoGroup.

我尝试的第三个解决方案是在每次迭代开始时从磁盘读取点的数据集,并在迭代结束时将它们写入磁盘(可能效率非常低)。但是,写入磁盘会返回 DataSink,因此此解决方案也会出现上面显示的第一个异常。

我可以尝试更好的解决方案吗?或者 Flink 迭代不支持这样的用例?

最佳答案

Flink 的迭代目前仅支持一个移动数据集,这样它就可以获得良好的运行时属性,其中所有静态数据都保存在内存中,而移动数据集则通过流式传输。理论上,Flink 可以支持更多,但很多情况下这些好的特性无法保持。

就您的情况而言,您可以通过将两个数据集连接到一个 centroidWithPoints = clusters 来解决该问题,其中对于每个质心,您还存储一个点列表。

或者,您可以使用标记联合,将两个数据集合并为一个,然后在下一次迭代开始时将其拆分。

关于java - 在 Apache Flink 中将多个数据集传输到下一次迭代,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60554090/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com