gpt4 book ai didi

csv - PySpark 不规则执行上的 PCA

转载 作者:行者123 更新时间:2023-12-04 16:13:56 26 4
gpt4 key购买 nike

我正在使用 PySpark 通过 csv 文件处理 PCA。我有一些奇怪的行为;我的代码有时可以完美运行,但有时会返回此错误:

 File "C:/spark/spark-2.1.0-bin-hadoop2.7/bin/pca_final2.py", line 25, in <module>
columns = (fileObj.first()).split(';')
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1361, in first
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1343, in take
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\context.py", line 965, in runJob
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in __call__
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 63, in deco
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.net.SocketException: Connection reset by peer: socket write error

这是我的代码:
#########################! importing libraries !########################
from __future__ import print_function
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.mllib.feature import Normalizer
import timeit
########################! main script !#################################
sc = SparkContext("local", "pca-app")
sqlContext = SQLContext(sc)
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PCAExample")\
.getOrCreate()
start=timeit.default_timer()
fileObj = sc.textFile('bigiris.csv')
data = fileObj.map(lambda line: [float(k) for k in line.split(';')])
columns = (fileObj.first()).split(';')
df = spark.createDataFrame(data, columns)
df.show()
vecAssembler = VectorAssembler(inputCols=columns, outputCol="features")
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
pipeline = Pipeline(stages=[vecAssembler, pca])
model = pipeline.fit(df)
result = model.transform(df).select("pcaFeatures")
stop=timeit.default_timer()
result.show(truncate=False)
time=stop-start
print ("this operation takes ", (time), " seconds")
spark.stop()

为什么我会得到这种不规则的执行?我应该添加什么来解决这个问题。

最佳答案

您在创建 data 时没有过滤掉标题框架。假设您的列名是字符串,这将导致错误,因为列名无法转换为浮点值。请参阅下面使用 filter 的脚本的修改部分。删除标题。

fileObj = sc.textFile('e:/iris.data.txt')
header = fileObj.first()
data = fileObj.filter(lambda x: x != header).map(lambda line: [float(k) for k in line.split(';')])
columns = header.split(';')
df = spark.createDataFrame(data, columns)
df.show()

关于csv - PySpark 不规则执行上的 PCA,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42790037/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com