gpt4 book ai didi

scala - 任务不可序列化 Flink

转载 作者:行者123 更新时间:2023-12-04 22:45:01 25 4
gpt4 key购买 nike

我正在尝试在 flink 中执行 pagerank Basic 示例,并稍作修改(仅在读取输入文件时,其他一切都相同)我收到错误,因为 Task not serializable 和以下是输出错误的一部分

atorg.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:179) at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:171)



下面是我的代码

object hpdb {

def main(args: Array[String]) {

val env = ExecutionEnvironment.getExecutionEnvironment

val maxIterations = 10000

val DAMPENING_FACTOR: Double = 0.85

val EPSILON: Double = 0.0001

val outpath = "/home/vinoth/bigdata/assign10/pagerank.csv"

val links = env.readCsvFile[Tuple2[Long,Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
fieldDelimiter = "\t", includedFields = Array(1,4)).as('sourceId,'targetId).toDataSet[Link]//source and target

val pages = env.readCsvFile[Tuple1[Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
fieldDelimiter = "\t", includedFields = Array(1)).as('pageId).toDataSet[Id]//Pageid

val noOfPages = pages.count()

val pagesWithRanks = pages.map(p => Page(p.pageId, 1.0 / noOfPages))

val adjacencyLists = links
// initialize lists ._1 is the source id and ._2 is the traget id
.map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
// concatenate lists
.groupBy("sourceId").reduce {
(l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
}

// start iteration

val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
// **//the output shows error here**
currentRanks =>
val newRanks = currentRanks
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
for (targetId <- adjacent.targetIds) {
out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
}
}

// collect ranks and sum them up

.groupBy("pageId").aggregate(SUM, "rank")
// apply dampening factor
//**//the output shows error here**
.map { p =>
Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / pages.count()))
}

// terminate if no rank update was significant
val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
(current, next, out: Collector[Int]) =>
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}

(newRanks, termination)
}

val result = finalRanks

// emit result
result.writeAsCsv(outpath, "\n", " ")

env.execute()

}
}

任何在正确方向上的帮助都受到高度赞赏?谢谢你。

最佳答案

问题是您从 DataSet 中引用了 pages MapFunction 。这是不可能的,因为 DataSet 只是数据流的逻辑表示,不能在运行时访问。

要解决此问题,您需要做的是将 val pagesCount = pages.count 值分配给变量 pagesCount 并在您的 MapFunction 中引用该变量。
pages.count 实际做的,就是触发数据流图的执行,这样可以统计pages 中的元素个数。然后将结果返回到您的程序。

关于scala - 任务不可序列化 Flink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31315478/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com