- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
在我的 Scalding hadoop 作业中,我在管道上有一些分组逻辑,然后我需要处理每个组:
val georecs : TypedPipe[GeoRecord] = getRecords
georecs.map( r => (getRegion(r),r) )
.groupBy(_._1)
.mapValueStream( xs => clusterRecords(xs) )
.values
.write(out)
在 clusterRecords 内部,我需要将传入的迭代器转换为 TypedPipe,以便我可以 1) 对其进行采样和 2) 取叉积:
//turn the iterator to a pipe so we can sample it
val sample = TypedPipe.from( xs.map( x => Centroid(x._2.coreActivity)).toIterable)
.sample(0.11)
.distinct
//turn the iterator to a pipe so we can take its cross product
val records : TypedPipe[GeoRecord] = TypedPipe.from(xs.map(_._2).toIterable)
records
.cross(sample) //cartesian product of records and centroids
.groupBy( _._2) // group By the user record so we get a list of pairs (user, centroid)
.minBy( x => score( x._1.coreActivity, x._2.core) ) //find the centroid with the lowest score for each Record
.values
.groupBy( x => x._2 ) //now groupBy centroid to get the clusters
.values
问题是 mapValueStream 期望映射函数返回一个迭代器,但我拥有的是一个 TypedPipe。我知道如何将迭代器变成管道,但反之则不然。我是否需要执行它,将其写入磁盘,然后再读回?
如果是这样,实现该目标的最佳方法是什么?
最佳答案
看起来您可以通过运行将管道转换为迭代器。这可以像这样完成:
val georecs : TypedPipe[GeoRecord] = getRecords
val i : Iterator[GeoRecord] = georecs
.toIterableExecution
.waitFor(this.scaldingConfig,this.mode)
.get
.toIterator
(类型检查,但尚未测试)
关于scala - 如何将 Scalding TypedPipe 转换为 Iterator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32507110/
有如下数据: pid recom-pid 1 1 1 2 1 3 2 1 2 2 2 4 2 5 需要做到: pid, recommendations 1 2,3
我正在 HDFS 上通过烫洗、聚合某些字段并通过 TSV 写入制表符分隔文件来重新读取文件。如何写出包含输出文件架构的文件?例如, UnpackedAvroSource(args("input"))
一个月前我在 Cascading 工作。现在我们正在尝试在 Scalding 中实现相同的功能。我有一个基本问题。 如何在 Scalding 中定义我的源和接收器模式? 以下是我们在级联中遵循的过程
我正在将一个项目从hadoop移植到Spark 2.1.0。以前,它使用twitter.scalding.addTrap处理类似以下的异常: https://github.com/scalding-i
我有以下代码,其中维护一个大列表:我在这里所做的是检查数据流并创建倒排索引。我使用twitter scalding api,dataTypePipe是TypedPipe的类型 lazy val c
我正在尝试将一个管道输出到不同的目录中,这样每个目录的输出将根据一些 ID 进行分桶。所以在普通的 map reduce 代码中,我会使用 MultipleOutputs 类,我会在 reducer
作为使用 Scalding 进行某些计算的最后一步,我想计算管道中列的多个平均值。但是下面的代码不起作用 myPipe.groupAll { _average('col1,'col2, 'col3)
我正在 Scalding 中编写 MapReduce 作业,但在编译对我来说看起来完全合法的代码时遇到了困难。 val persistenceBins = List[Int](1000 * 60 *
这些天我遇到了一个问题,我正在尝试使用 scalding 从多个文件中读取数据并使用单个文件创建输出。我的代码是这样的: def getFilesSource (paths: Seq[String])
有人可以给我指向一个链接,该链接解释了如何在 scalding 中读取和编写简单的案例类吗?是否有一些默认的序列化方案? 例如,我有创建 com.twitter.algebird.Moments 管道
如果你想从 Scalding 中的一个较小的字段创建一个超过 22 个字段的管道,你会受到 Scala 元组的限制,它不能超过 22 个项目。 有没有办法使用集合而不是元组?我想像以下示例中的内容,但
使用 Scalding 时,您可以提供一个函数。我想知道 scalding 如何将这些函数传递给远程 map/reduce 任务?这是使用 scala 中的东西还是可以用匿名对象完成的通用东西? 最佳
只需将连接字段作为缩减键发送,就可以很容易地通过单键连接数据集。但是通过多个键连接记录(其中至少一个键应该相同)对我来说并不那么容易。 示例我有日志,我想按用户参数对它们进行分组,我想通过 (ipAd
我有以下格式的数据: "header1","header2","header3",... "value11","value12","value13",... "value21","value22","
我正在尝试运行 Scalding 示例字数统计示例。我已按照此 github 链接执行步骤:- https://github.com/twitter/scalding/wiki/Getting-Sta
所以人们在压缩 Scalding Jobs 的输出时遇到了问题,包括我自己。谷歌搜索后,我在某个不起眼的论坛中得到了奇怪的答案,但没有适合人们复制和粘贴需求的答案。 我想要像 Tsv 这样的输出,但写
我有一本 Antonios Chalkiopoulos 的 Programming MapReduce with Scalding。在书中,他讨论了 Scalding 代码的外部操作设计模式。你可以在
我正在使用 Scalding,我们有大约 5.5GB 的输出文件大小。 (例如,对于 30 个 reducer ,有 30 个 5.5GB 文件)。有没有办法说,将输出文件限制为每个 512MB?我可
我已经改编了 scalding KMeans 示例来执行 KModes。问题是当作业完成后,我需要将聚类记录与匹配的质心连接起来。 KMeans 代码使用 ValuePipe 来保存质心。因此,为了从
我正在使用 Elephant Bird's 将序列化的 Thrift 记录写入文件可分割的 LZO 压缩。为此,我使用了他们的 ThriftBlockWriter 类。然后我的 Scalding 工作
我是一名优秀的程序员,十分优秀!