gpt4 book ai didi

scala - 如何将 Scalding ValuePipe 加入 TypedPipe?

转载 作者:可可西里 更新时间:2023-11-01 16:51:34 26 4
gpt4 key购买 nike

我已经改编了 scalding KMeans 示例来执行 KModes。问题是当作业完成后,我需要将聚类记录与匹配的质心连接起来。 KMeans 代码使用 ValuePipe 来保存质心。因此,为了从 ValuePipe 中取出质心,我对其进行了平面映射。然后我像这样加入:

  HVKModes(500000,inputSets,10).waitFor(Config.default,mode) match {
case Success((a,centroids: ValuePipe[List[LabeledCentroid]], points: TypedPipe[LabeledVector])) => {
val joined = centroids
.flatMap {
cs : List[LabeledCentroid] => {
val t = TypedPipe.from(cs)
Iterator(points.join(t)) }
}
.values
.write(clusteredOutput)
}
case Failure(e) => sys.error("problem running job:" + e.toString)
}

问题是编译器在“值”行给出类型错误:

Cannot prove that com.twitter.scalding.typed.CoGrouped[Int,((String, Set[String]), Set[String])] <:< (Any, V).
[error] .values

我收集到错误表明它无法计算出我认为是值的 V。但是我应该怎么做呢?

最佳答案

我几乎做对了。我只需要加入平面映射的结果

  HVKModes(500000,inputSets,10).waitFor(Config.default,mode) match {
case Success((a,centroids: ValuePipe[List[LabeledCentroid]], points: TypedPipe[LabeledVector])) => {

val cs =
centroids
.flatMap {
cs : List[LabeledCentroid] => { cs.toIterator}
}

points
.join(cs)
.values
.write(clusteredOutput)
}
case Failure(e) => sys.error("problem running job:" + e.toString)
}

关于scala - 如何将 Scalding ValuePipe 加入 TypedPipe?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32704102/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com