gpt4 book ai didi

apache-spark - 如何从Zeppelin中的控制台流接收器获取输出?

转载 作者:行者123 更新时间:2023-12-04 03:36:03 28 4
gpt4 key购买 nike

从Zeppelin运行时,我正在努力让console接收器与PySpark Structured Streaming一起使用。基本上,我看不到任何结果打印到屏幕上或发现的任何日志文件中。

我的问题:是否有人使用将PySpark结构化流与接收器结合使用而产生在Apache Zeppelin中可见的输出的有效示例?理想情况下,它也将使用套接字源,因为这很容易测试。

我正在使用:

  • Ubuntu 16.04
  • spark-2.2.0-bin-hadoop2.7
  • 齐柏林飞艇0.7.3-bin-全部
  • Python3

  • 我的代码基于 structured_network_wordcount.py example。从PySpark shell( ./bin/pyspark --master local[2])运行时,它可以工作;我看到每个批次的表格。

    %pyspark
    # structured streaming
    from pyspark.sql.functions import *
    lines = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 9999)\
    .option('includeTimestamp', 'true')\
    .load()

    # Split the lines into words, retaining timestamps
    # split() splits each line into an array, and explode() turns the array into multiple rows
    words = lines.select(
    explode(split(lines.value, ' ')).alias('word'),
    lines.timestamp
    )

    # Group the data by window and word and compute the count of each group
    windowedCounts = words.groupBy(
    window(words.timestamp, '10 seconds', '1 seconds'),
    words.word
    ).count().orderBy('window')

    # Start running the query that prints the windowed word counts to the console
    query = windowedCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .option('truncate', 'false')\
    .start()

    print("Starting...")
    query.awaitTermination(20)

    我希望看到每个批次的结果的打印输出,但我只会看到 Starting...,然后是 False,即 query.awaitTermination(20)的返回值。

    在上面的运行过程中,我正在一个单独的终端中向 nc -lk 9999 netcat session 中输入一些数据。

    最佳答案

    对于基于笔记本的交互式工作流,控制台接收器不是一个不错的选择。即使在可以捕获输出的Scala中,它也需要在同一段落中调用awaitTermination(或等效命令),从而有效地阻止了音符。

    %spark

    spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", "9999")
    .option("includeTimestamp", "true")
    .load()
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .start()
    .awaitTermination() // Block execution, to force Zeppelin to capture the output

    链式 awaitTermination可以用独立调用 代替,在同一段中也可以:

    %spark

    val query = df
    .writeStream
    ...
    .start()

    query.awaitTermination()

    没有它,齐柏林飞艇没有理由等待任何输出。 PySpark只是在此之上添加了另一个问题-间接执行。因此,即使阻止查询也无济于事。

    此外,从流中连续输出可能会导致浏览笔记时出现渲染问题和内存问题(可能可以通过 InterpreterContext或REST API使用Zeppelin显示系统来实现更明智的行为,其中输出被覆盖或定期清除) 。

    使用Zeppelin进行测试的更好选择是 memory sink。这样,您可以启动查询而不会阻塞:

    %pyspark

    query = (windowedCounts
    .writeStream
    .outputMode("complete")
    .format("memory")
    .queryName("some_name")
    .start())

    并在另一段中按需查询结果:

    %pyspark

    spark.table("some_name").show()

    它可以与 reactive streams或类似的解决方案结合使用,以提供基于间隔的更新。

    尽管PySpark不支持查询监听器,并且需要一些代码才能将东西粘合在一起,但也可以将 StreamingQueryListener与Py4j回调一起使用,以将 rxonQueryProgress事件耦合在一起。 Scala界面:

    package com.example.spark.observer

    import org.apache.spark.sql.streaming.StreamingQueryListener
    import org.apache.spark.sql.streaming.StreamingQueryListener._

    trait PythonObserver {
    def on_next(o: Object): Unit
    }

    class PythonStreamingQueryListener(observer: PythonObserver)
    extends StreamingQueryListener {
    override def onQueryProgress(event: QueryProgressEvent): Unit = {
    observer.on_next(event)
    }
    override def onQueryStarted(event: QueryStartedEvent): Unit = {}
    override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
    }

    构建一个jar,调整构建定义以反射(reflect)所需的Scala和Spark版本:

    scalaVersion := "2.11.8"  

    val sparkVersion = "2.2.0"

    libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-sql" % sparkVersion,
    "org.apache.spark" %% "spark-streaming" % sparkVersion
    )

    将其放在Spark类路径上,修补 StreamingQueryManager:

    %pyspark

    from pyspark.sql.streaming import StreamingQueryManager
    from pyspark import SparkContext

    def addListener(self, listener):
    jvm = SparkContext._active_spark_context._jvm
    jlistener = jvm.com.example.spark.observer.PythonStreamingQueryListener(
    listener
    )
    self._jsqm.addListener(jlistener)
    return jlistener


    StreamingQueryManager.addListener = addListener

    启动回调服务器:

    %pyspark

    sc._gateway.start_callback_server()

    并添加监听器:

    %pyspark

    from rx.subjects import Subject

    class StreamingObserver(Subject):
    class Java:
    implements = ["com.example.spark.observer.PythonObserver"]

    observer = StreamingObserver()
    spark.streams.addListener(observer)

    最后,您可以使用 subscribe并阻止执行:

    %pyspark

    (observer
    .map(lambda p: p.progress().name())
    # .filter() can be used to print only for a specific query
    .subscribe(lambda n: spark.table(n).show() if n else None))
    input() # Block execution to capture the output

    开始流式查询后,应该执行最后一步。

    也可以跳过 rx并使用最少的观察者,如下所示:

    class StreamingObserver(object):
    class Java:
    implements = ["com.example.spark.observer.PythonObserver"]

    def on_next(self, value):
    try:
    name = value.progress().name()
    if name:
    spark.table(name).show()
    except: pass

    它提供的控制比 Subject少(一个警告是,这可能会干扰其他代码向stdout的打印,并且只能由 removing listener停止。使用 Subject,您可以在完成后轻松地 dispose subscribed观察者),但是否则应该工作大致相同。

    请注意,任何阻止操作都足以捕获来自监听器的输出,而不必在同一单元中执行。例如

    %pyspark

    observer = StreamingObserver()
    spark.streams.addListener(observer)



    %pyspark

    import time

    time.sleep(42)

    将以类似的方式工作,在定义的时间间隔内打印表格。

    为了完整起见,您可以实现 StreamingQueryManager.removeListener

    关于apache-spark - 如何从Zeppelin中的控制台流接收器获取输出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47357418/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com