- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
鉴于键/值对的数据源相对较小 (3,000-10,000),我尝试仅处理满足组阈值 (50-100) 的记录。因此,最简单的方法是按键对它们进行分组、过滤和展开——使用 FlatMap 或 ParDo。迄今为止,最大的一组只有 1,500 条记录。但这似乎是 Google Cloud Dataflow 生产中的一个严重瓶颈。
根据给定的列表
(1, 1)(1, 2)(1, 3)...(2, 1)(2, 2)(2, 3)...
运行一组转换以按键过滤和分组:
p | 'Group' >> beam.GroupByKey()
| 'Filter' >> beam.Filter(lambda (key, values): len(list(values)) > 50)
| 'Unwind' >> beam.FlatMap(lambda (key, values): values)
关于如何提高性能的任何想法?感谢您的帮助!
最佳答案
这是管道的一个有趣的极端情况。我相信您的问题在于您读取来自 GroupByKey
的数据的方式。让我简要介绍一下 GBK 的工作原理。
GroupByKey
,大数据系统是如何实现的所有大数据系统都实现了对同一键的多个元素进行操作的方法。这在 MapReduce 中称为 reduce,在其他大数据系统中称为 Group By Key 或 Combine。
当您执行 GroupByKey
转换时,Dataflow 需要将单个键的所有元素收集到同一台机器中。由于同一 key 的不同元素可能在不同的机器上处理,因此需要以某种方式对数据进行序列化。
这意味着当您读取来自 GroupByKey
的数据时,您正在访问 worker 的 IO(即不是从内存),因此您真的想避免读取随机数据太多次.
我认为您的问题在于 Filter
和 Unwind
都将分别从 shuffle 中读取数据(因此您将为每个列表读取数据两次)。你想要做的是只读取一次你的洗牌数据。您可以使用单个 FlatMap
来完成此操作,它既可以过滤又可以展开您的数据,而无需从 shuffle 中重复读取。像这样:
def unwind_and_filter((key, values)):
# This consumes all the data from shuffle
value_list = list(values)
if len(value_list) > 50:
yield value_list
p | 'Group' >> beam.GroupByKey()
| 'UnwindAndFilter' >> beam.FlatMap(unwind_and_filter)
如果这有帮助,请告诉我。
关于python - 为什么 Apache Beam python 中的 GroupByKey 之后的 FlatMap 这么慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45889618/
我有一个对象Foo,其中包含Bar列表。这些类的描述如下: class Foo { String name; List bars = new ArrayList(); Foo(
根据 Mozilla 开发者网站: flatMap() 方法首先使用映射函数映射每个元素,然后将结果展平到一个新数组中。它与 map 后跟深度为 1 的 flat 相同,但 flatMap 通常非常有
我对无法找到该问题的现有问题感到非常惊讶。这是为什么,鉴于: val p: Int => Option[Int] = Some(_) List(1, 2, 3).flatMap(p) 我得到: :14
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 5 年前。 Improve th
我想知道两种平面映射情况之间是否存在显着差异。 案例 1: someCollection .stream() .map(CollectionElement::getAnotherCol
以下是flatMap的定义取自 scala.util.Success。 final case class Success[+T](value: T) extends Try[T] { def fl
我正在寻找一个函数来展平列表数组。首先,我在 RDD 系统上使用 Apach Spark 函数 flatMap 实现了我的解决方案,但我想在本地执行此操作。但是,我无法找到 的等价物 samples
我想知道是否存在忽略 flatMap 中的结果的函数(在 scala 或 cat 中) .例如。 Some("ignore this").ignoreArgumentFlatMap(Some("res
我正在学习 Scala 并解决了 99 个 Scala 问题。对于以下练习: 展平嵌套列表结构。示例: scala> flatten(List(List(1, 1), 2, List(3, List(
当编译器进入无限循环时,是否有人遇到过使用此类 flatMap 链(或什至更长)的问题。 let what = Future.init { (promise) in promise(.succ
有没有更好的函数方式来写 flatMap ? def flatMap[A,B](list: List[A])(f: A => List[B]): List[B] = list.map(x =>
我试图从两个 中变出笛卡尔积潜在无限然后我通过 limit() 限制的流. 到目前为止,这(大约)是我的策略: @Test void flatMapIsLazy() { Stream.
为什么以下声明对 .map() 有效但不适用于 .flatMap() ? val tupled = input.map(x => (x*2, x*3)) //Compilation error:
我正在寻找可以同时映射和展平 Lists 和 Maybes 的代码。我在 this topic 中发现了这样一个 flatMap 函数: flatMap :: (t -> [a]) -> [t] ->
考虑在某些大小写匹配上编写的 flatMap。例如: list.flatMap( v => v match { case Cond1 => if(something) Some
我无法使用ListKOf平面映射T -> Option。 例如 listOf(1,2,3).k().flatMap { i -> if (i % 2 == 0) Some(i) else None
有人可以解释我如何在RxJava中通过flatMap运算符传递onComplete信号吗? 如果对flatMap运算符进行注释,则可以获取1到10的数字列表,这意味着toList将收到onComple
我正在做一个在线类(class)并误读了一个问题(这就是为什么我认为可以发布这个问题,因为答案与类(class)中的问题无关!)。 data class Trip( val drive
给定作为数据类的二维坐标列表 data class Point(val x: Int, val y:Int) val points: List 和 TornadoFX(Kotlin 中的 JavaFX
这个问题已经有答案了: What is the difference between .stream() and Stream.of? (5 个回答) 已关闭 3 年前。 我有以下代码: List p
我是一名优秀的程序员,十分优秀!