- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有 RDD[CassadraRow] 到 scala 中的 List[CassandraRow]。在下面的代码中我遇到内存泄漏问题:
val rowKeyRdd: Array[CassandraRow] =
sc.cassandraTable(keyspace, table).select("customer_id", "uniqueaddress").collect()
val clientPartitionKeys = rowKeyRdd.map(x => ClientPartitionKey(
x.getString("customer_id"), x.getString("uniqueaddress"))).toList
val clientRdd: RDD[CassandraRow] =
sc.parallelize(clientPartitionKeys).joinWithCassandraTable(keyspace, table)
.where("eventtime >= ?", startDate)
.where("eventtime <= ?", endDate)
.map(x => x._2)
clientRdd.cache()
我已经删除了cache(),但仍然遇到问题。
org.jboss.netty.channel.socket.nio.AbstractNioSelector
WARNING: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
at org.jboss.netty.buffer.HeapChannelBuffer.<init>(HeapChannelBuffer.java:42)
at org.jboss.netty.buffer.BigEndianHeapChannelBuffer.<init>(BigEndianHeapChannelBuffer.java:34)
at org.jboss.netty.buffer.ChannelBuffers.buffer(ChannelBuffers.java:134)
at org.jboss.netty.buffer.HeapChannelBufferFactory.getBuffer(HeapChannelBufferFactory.java:68)
at org.jboss.netty.buffer.AbstractChannelBufferFactory.getBuffer(AbstractChannelBufferFactory.java:48)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:80)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
ERROR 2016-02-12 07:54:48 akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError:超出 GC 开销限制
如何避免内存泄漏。我尝试过每个核心 8GB。并且表包含数百万条记录。
最佳答案
在这一行中,您的变量名称表明您有一个 RDD,但实际上,因为您使用的是 collect()
,所以它不是一个 RDD,正如您的类型声明所示,它是一个数组:
val rowKeyRdd: Array[CassandraRow] =
sc.cassandraTable(keyspace, table).select("customer_id", "uniqueaddress").collect()
这会将所有数据从工作线程提取到驱动程序中,因此工作线程的内存量(每个核心 8GB)不是问题,驱动程序中没有足够的内存来处理此收集。
由于您对这些数据所做的只是映射它,然后将其重新并行化回 RDD,因此您应该在不调用 collect()
的情况下映射它。我没有尝试下面的代码,因为我无权访问您的数据集,但它应该大致正确:
val rowKeyRdd: RDD[CassandraRow] =
sc.cassandraTable(keyspace, table).select("customer_id", "uniqueaddress")
val clientPartitionKeysRDD = rowKeyRdd.map(x => ClientPartitionKey(
x.getString("customer_id"), x.getString("uniqueaddress")))
val clientRdd: RDD[CassandraRow] =
clientPartitionKeysRDD.joinWithCassandraTable(keyspace, table)
.where("eventtime >= ?", startDate)
.where("eventtime <= ?", endDate)
.map(x => x._2)
clientRdd.cache()
关于java - 如何在scala中不使用collec()将RDD[CassandraRow]转换为List[CassandraRow],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35358055/
我有 RDD[CassadraRow] 到 scala 中的 List[CassandraRow]。在下面的代码中我遇到内存泄漏问题: val rowKeyRdd: Array[CassandraRo
由于从 Cassandra 查询数据有限制,我尝试使用 Spark 批量读取数据并将其存储在 RDD 中。 然后我使用 union 函数添加所有 RDD 。 这是我的代码。 private void
我正在尝试从 RDD[cassandraRow] 创建一个 Dataframe .. 但我不能因为 createDataframe( RDD[Row] ,schema: StructType) 需要
我有一个 CassandraRow 对象,其中包含行的值。我从一张 table 上读到的。我想将同一个对象写入另一个表。但后来我得到了这个错误: requirement failed: Columns
我是一名优秀的程序员,十分优秀!