gpt4 book ai didi

apache-spark - 使用 Scala 转换 PySpark RDD

转载 作者:行者123 更新时间:2023-12-04 18:03:08 24 4
gpt4 key购买 nike

TL;DR - 我在 PySpark 应用程序中有一个看起来像字符串 DStream 的东西。我想以 DStream[String] 的形式发送它到 Scala 库。不过,Py4j 不会转换字符串。

我正在开发一个 PySpark 应用程序,该应用程序使用 Spark Streaming 从 Kafka 中提取数据。我的消息是字符串,我想在 Scala 代码中调用一个方法,将它传递给 DStream[String]实例。但是,我无法在 Scala 代码中接收正确的 JVM 字符串。在我看来,Python 字符串没有转换为 Java 字符串,而是被序列化了。

我的问题是:如何从 DStream 中获取 Java 字符串。目的?

这是我想出的最简单的 Python 代码:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()

我在 PySpark 中运行这段代码,将路径传递给我的 JAR:
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar

在 Scala 方面,我有:
package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
def doSomething(jdstream: JavaDStream[String]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(println)
})
}
}

现在,假设我将一些数据发送到 Kafka:
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
println Scala 代码中的语句打印如下内容:
[B@758aa4d9

我希望得到 foo bar反而。

现在,如果我替换简单的 println在 Scala 代码中使用以下语句:
rdd.foreach(v => println(v.getClass.getCanonicalName))

我得到:
java.lang.ClassCastException: [B cannot be cast to java.lang.String

这表明字符串实际上是作为字节数组传递的。

如果我只是尝试将此字节数组转换为字符串(我知道我什至没有指定编码):
      def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(bytes => println(new String(bytes)))
})
}

我得到的东西看起来像(特殊字符可能被剥离):
�]qXfoo barqa.

这表明 Python 字符串已被序列化(腌制?)。我怎样才能检索到正确的 Java 字符串呢?

最佳答案

长话短说,没有支持的方式来做这样的事情。不要在生产中尝试这个。你已经被警告过了。

一般来说,Spark 不会将 Py4j 用于驱动程序上的一些基本 RPC 调用,并且不会在任何其他机器上启动 Py4j 网关。当需要时(主要是 MLlib 和 SQL 的某些部分),Spark 使用 Pyrolite序列化在 JVM 和 Python 之间传递的对象。

API 的这一部分是私有(private)的 (Scala) 或内部的 (Python),因此不适合一般用途。虽然理论上您可以按批处理访问它:

package dummy

import org.apache.spark.api.java.JavaRDD
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.sql.DataFrame

object PythonRDDHelper {
def go(rdd: JavaRDD[Any]) = {
rdd.rdd.collect {
case s: String => s
}.take(5).foreach(println)
}
}

完整流:

object PythonDStreamHelper {
def go(stream: JavaDStream[Any]) = {
stream.dstream.transform(_.collect {
case s: String => s
}).print
}
}

或将单个批处理公开为 DataFrames (可能是最不邪恶的选择):

object PythonDataFrameHelper {
def go(df: DataFrame) = {
df.show
}
}

并按如下方式使用这些包装器:

from pyspark.streaming import StreamingContext
from pyspark.mllib.common import _to_java_object_rdd
from pyspark.rdd import RDD

ssc = StreamingContext(spark.sparkContext, 10)
spark.catalog.listTables()

q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)])

# Reserialize RDD as Java RDD<Object> and pass
# to Scala sink (only for output)
q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
_to_java_object_rdd(rdd)
))

# Reserialize and convert to JavaDStream<Object>
# This is the only option which allows further transformations
# on DStream
ssc._jvm.dummy.PythonDStreamHelper.go(
q.transform(lambda rdd: RDD( # Reserialize but keep as Python RDD
_to_java_object_rdd(rdd), ssc.sparkContext
))._jdstream
)

# Convert to DataFrame and pass to Scala sink.
# Arguably there are relatively few moving parts here.
q.foreachRDD(lambda rdd:
ssc._jvm.dummy.PythonDataFrameHelper.go(
rdd.map(lambda x: (x, )).toDF()._jdf
)
)

ssc.start()
ssc.awaitTerminationOrTimeout(30)
ssc.stop()

这是不支持的,未经测试的,因此除了使用 Spark API 进行实验之外,其他任何东西都没有用。

关于apache-spark - 使用 Scala 转换 PySpark RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39458465/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com