- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
下面的 Spark 代码似乎没有对文件 example.txt
执行任何操作
val conf = new org.apache.spark.SparkConf()
.setMaster("local")
.setAppName("filter")
.setSparkHome("C:\\spark\\spark-1.2.1-bin-hadoop2.4")
.set("spark.executor.memory", "2g");
val ssc = new StreamingContext(conf, Seconds(1))
val dataFile: DStream[String] = ssc.textFileStream("C:\\example.txt")
dataFile.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
dataFile.print()
打印文件的前 10 个元素
15/03/12 12:23:53 INFO JobScheduler: Started JobScheduler
15/03/12 12:23:54 INFO FileInputDStream: Finding new files took 105 ms
15/03/12 12:23:54 INFO FileInputDStream: New files at time 1426163034000 ms:
15/03/12 12:23:54 INFO JobScheduler: Added jobs for time 1426163034000 ms
15/03/12 12:23:54 INFO JobScheduler: Starting job streaming job 1426163034000 ms.0 from job set of time 1426163034000 ms
-------------------------------------------
Time: 1426163034000 ms
-------------------------------------------
15/03/12 12:23:54 INFO JobScheduler: Finished job streaming job 1426163034000 ms.0 from job set of time 1426163034000 ms
15/03/12 12:23:54 INFO JobScheduler: Total delay: 0.157 s for time 1426163034000 ms (execution: 0.006 s)
15/03/12 12:23:54 INFO FileInputDStream: Cleared 0 old files that were older than 1426162974000 ms:
15/03/12 12:23:54 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/03/12 12:23:55 INFO FileInputDStream: Finding new files took 2 ms
15/03/12 12:23:55 INFO FileInputDStream: New files at time 1426163035000 ms:
15/03/12 12:23:55 INFO JobScheduler: Added jobs for time 1426163035000 ms
15/03/12 12:23:55 INFO JobScheduler: Starting job streaming job 1426163035000 ms.0 from job set of time 1426163035000 ms
-------------------------------------------
Time: 1426163035000 ms
-------------------------------------------
15/03/12 12:23:55 INFO JobScheduler: Finished job streaming job 1426163035000 ms.0 from job set of time 1426163035000 ms
15/03/12 12:23:55 INFO JobScheduler: Total delay: 0.011 s for time 1426163035000 ms (execution: 0.001 s)
15/03/12 12:23:55 INFO MappedRDD: Removing RDD 1 from persistence list
15/03/12 12:23:55 INFO BlockManager: Removing RDD 1
15/03/12 12:23:55 INFO FileInputDStream: Cleared 0 old files that were older than 1426162975000 ms:
15/03/12 12:23:55 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/03/12 12:23:56 INFO FileInputDStream: Finding new files took 3 ms
15/03/12 12:23:56 INFO FileInputDStream: New files at time 1426163036000 ms:
example.txt
是格式:
gdaeicjdcg,194,155,98,107
jhbcfbdigg,73,20,122,172
ahdjfgccgd,28,47,40,178
afeidjjcef,105,164,37,53
afeiccfdeg,29,197,128,85
aegddbbcii,58,126,89,28
fjfdbfaeid,80,89,180,82
print
文件指出:
val conf = new org.apache.spark.SparkConf()
.setMaster("local[2]")
.setAppName("filter")
.setSparkHome("C:\\spark\\spark-1.2.1-bin-hadoop2.4")
.set("spark.executor.memory", "2g");
val ssc = new StreamingContext(conf, Seconds(1))
val dataFile: DStream[String] = ssc.textFileStream("file:///c:/data/")
dataFile.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
c:\\data
添加新文件时dir(与现有数据文件具有相同的格式),它们不会被处理。我假设
dataFile.print
应该打印前 10 行到控制台?
最佳答案
您误解了 textFileStream
的用法.以下是 Spark 文档中的描述:
创建一个输入流来监控与 Hadoop 兼容的文件系统中的新文件并将它们作为文本文件读取(使用键作为 LongWritable,值作为文本和输入格式作为 TextInputFormat)。
因此,首先,您应该将目录传递给它,其次,运行接收器的节点应该可以使用该目录,因此最好为此目的使用 HDFS。然后当你放一个 新品 文件放入此目录,它将被函数 print()
处理并为其打印前 10 行
更新:
我的代码:
[alex@sparkdemo tmp]$ pyspark --master local[2]
Python 2.6.6 (r266:84292, Nov 22 2013, 12:16:22)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
s15/03/12 06:37:49 WARN Utils: Your hostname, sparkdemo resolves to a loopback address: 127.0.0.1; using 192.168.208.133 instead (on interface eth0)
15/03/12 06:37:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0
/_/
Using Python version 2.6.6 (r266:84292, Nov 22 2013 12:16:22)
SparkContext available as sc.
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc, 30)
>>> dataFile = ssc.textFileStream('file:///tmp')
>>> dataFile.pprint()
>>> ssc.start()
>>> ssc.awaitTermination()
-------------------------------------------
Time: 2015-03-12 06:40:30
-------------------------------------------
-------------------------------------------
Time: 2015-03-12 06:41:00
-------------------------------------------
-------------------------------------------
Time: 2015-03-12 06:41:30
-------------------------------------------
1 2 3
4 5 6
7 8 9
-------------------------------------------
Time: 2015-03-12 06:42:00
-------------------------------------------
关于scala - Scala Apache Spark中DStream的输出内容,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29009870/
我有一些 Scala 代码,它用两个不同版本的类型参数化函数做了一些漂亮的事情。我已经从我的应用程序中简化了很多,但最后我的代码充满了形式 w(f[Int],f[Double]) 的调用。哪里w()是
如果我在同一目录中有两个单独的未编译的 scala 文件: // hello.scala object hello { def world() = println("hello world") }
val schema = df.schema val x = df.flatMap(r => (0 until schema.length).map { idx => ((idx, r.g
环境: Play 2.3.0/Scala 2.11.1/IntelliJ 13.1 我使用 Typesafe Activator 1.2.1 用 Scala 2.11.1 创建一个新项目。项目创建好后
我只是想知道如何使用我自己的类扩展 Scala 控制台和“脚本”运行程序,以便我可以通过使用实际的 Scala 语言与其通信来实际使用我的代码?我应将 jar 放在哪里,以便无需临时配置即可从每个 S
我已经根据 README.md 文件安装了 ensime,但是,我在低级 ensime-server 缓冲区中出现以下错误: 信息: fatal error :scala.tools.nsc.Miss
我正在阅读《Scala 编程》一书。在书中,它说“一个函数文字被编译成一个类,当在运行时实例化时它是一个函数值”。并且它提到“函数值是对象,因此您可以根据需要将它们存储在变量中”。 所以我尝试检查函数
我有 hello world scala native 应用程序,想对此应用程序运行小型 scala 测试我使用通常的测试命令,但它抛出异常: NativeMain.scala object Nati
有few resources在网络上,在编写与代码模式匹配的 Scala 编译器插件方面很有指导意义,但这些对生成代码(构建符号树)没有帮助。我应该从哪里开始弄清楚如何做到这一点? (如果有比手动构建
我是 Scala 的新手。但是,我用 创建了一个中等大小的程序。斯卡拉 2.9.0 .现在我想使用一个仅适用于 的开源库斯卡拉 2.7.7 . 是吗可能 在我的 Scala 2.9.0 程序中使用这个
有没有办法在 Scala 2.11 中使用 scala-pickling? 我在 sonatype 存储库中尝试了唯一的 scala-pickling_2.11 工件,但它似乎不起作用。我收到消息:
这与命令行编译器选项无关。如何以编程方式获取代码内的 Scala 版本? 或者,Eclipse Scala 插件 v2 在哪里存储 scalac 的路径? 最佳答案 这无需访问 scala-compi
我正在阅读《Scala 编程》一书,并在第 6 章中的类 Rational 实现中遇到了一些问题。 这是我的 Rational 类的初始版本(基于本书) class Rational(numerato
我是 Scala 新手,我正在尝试开发一个使用自定义库的小项目。我在库内创建了一个mysql连接池。这是我的库的build.sbt organization := "com.learn" name :
我正在尝试运行一些 Scala 代码,只是暂时打印出“Hello”,但我希望在 SBT 项目中编译 Scala 代码之前运行 Scala 代码。我发现在 build.sbt 中有以下工作。 compi
Here链接到 maven Scala 插件使用。但没有提到它使用的究竟是什么 Scala 版本。我创建了具有以下配置的 Maven Scala 项目: org.scala-tools
我对 Scala 还很陌生,请多多包涵。我有一堆包裹在一个大数组中的 future 。 future 已经完成了查看几 TB 数据的辛勤工作,在我的应用程序结束时,我想总结上述 future 的所有结
我有一个 scala 宏,它依赖于通过包含其位置的静态字符串指定的任意 xml 文件。 def myMacro(path: String) = macro myMacroImpl def myMacr
这是我的功能: def sumOfSquaresOfOdd(in: Seq[Int]): Int = { in.filter(_%2==1).map(_*_).reduce(_+_) } 为什么我
这个问题在这里已经有了答案: Calculating the difference between two Java date instances (45 个答案) 关闭 5 年前。 所以我有一个这
我是一名优秀的程序员,十分优秀!