- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在将多列从分类值转换为数值时遇到问题。我正在使用 PySpark,但我确定问题不是我使用的 spark 版本。使用一列时,没有问题,但在转换多列时遇到问题。这是代码,没有缺失值:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
categorical_columns= ['age','job', 'marital','education', 'default', 'housing', 'loan', 'poutcome', 'y']
indexers = [
StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
for c in categorical_columns
]
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
outputCol="{0}_encoded".format(indexer.getOutputCol()))
for indexer in indexers
]
# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")
pipeline = Pipeline(stages=indexers + encoders+[assembler])
model=pipeline.fit(df2)
transformed = model.transform(df2)
transformed.show(5)
输出是:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-48-452b475faf1a> in <module>
20
21 pipeline = Pipeline(stages=indexers + encoders+[assembler])
---> 22 model=pipeline.fit(df2)
23 transformed = model.transform(df2)
24 transformed.show(5)
E:\spark-2.4.2-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
E:\spark-2.4.2-bin-hadoop2.7\python\pyspark\ml\pipeline.py in _fit(self, dataset)
107 dataset = stage.transform(dataset)
108 else: # must be an Estimator
--> 109 model = stage.fit(dataset)
110 transformers.append(model)
111 if i < indexOfLastEstimator:
E:\spark-2.4.2-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
E:\spark-2.4.2-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit(self, dataset)
293
294 def _fit(self, dataset):
--> 295 java_model = self._fit_java(dataset)
296 model = self._create_model(java_model)
297 return self._copyValues(model)
E:\spark-2.4.2-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit_java(self, dataset)
290 """
291 self._transfer_params_to_java()
--> 292 return self._java_obj.fit(dataset._jdf)
293
294 def _fit(self, dataset):
E:\spark-2.4.2-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
E:\spark-2.4.2-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
E:\spark-2.4.2-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o1833.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 79.0 failed 1 times, most recent failure: Lost task 0.0 in stage 79.0 (TID 72, localhost, executor driver): java.io.FileNotFoundException: C:\Users\user\AppData\Local\Temp\blockmgr-11928db3-60f2-407b-b821-1338f779e3b5\0d\shuffle_30_0_0.data.6d622104-8179-4873-9b10-16afe2a61081 (The system cannot find the path specified)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)
at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237)
at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:699)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:274)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:370)
at org.apache.spark.rdd.RDD.$anonfun$countByValue$1(RDD.scala:1214)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.countByValue(RDD.scala:1214)
at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:140)
at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:109)
at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: C:\Users\user\AppData\Local\Temp\blockmgr-11928db3-60f2-407b-b821-1338f779e3b5\0d\shuffle_30_0_0.data.6d622104-8179-4873-9b10-16afe2a61081 (The system cannot find the path specified)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)
at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237)
at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:699)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
任何帮助将不胜感激
最佳答案
要将多列从分类值转换为数值,需要对每一列使用索引器和编码器,然后使用向量汇编器进行转换。在使用矢量汇编器之前,我还添加了一个最小-最大缩放器,如下所示:
stringIndexer = StringIndexer(inputCol="job", outputCol="job_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="job_index", outputCol="job_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="marital", outputCol="marital_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="marital_index", outputCol="marital_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="education", outputCol="education_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="education_index", outputCol="education_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="default", outputCol="default_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="default_index", outputCol="default_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="housing", outputCol="housing_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="housing_index", outputCol="housing_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="loan", outputCol="loan_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="loan_index", outputCol="loan_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="poutcome", outputCol="poutcome_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="poutcome_index", outputCol="poutcome_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="y", outputCol="y_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="y_index", outputCol="y_vec")
encoded = encoder.transform(indexed)
df2 = encoded
df2.show(4)
cols = ['job', 'marital','education', 'default', 'housing', 'loan', 'poutcome', 'y']
for col in cols:
scaler = MinMaxScaler(inputCol=col+"_vec", outputCol=col+"_vec_scaled")
scalerModel = scaler.fit(df2)
scaledData = scalerModel.transform(df2)
df2 = scaledData
df2.show(4)
vecAssembler = VectorAssembler(inputCols=[ '`job_vec_scaled','marital_vec_scaled','education_vec_scaled', 'default_vec_scaled', `'housing_vec`_scaled', 'loan_vec_scaled', 'poutcome_vec_scaled'], outputCol='features')`
df3 = vecAssembler.transform(df2)
df3.show(4)
关于python - 如何使用 PySpark 执行一次热编码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55922787/
我在数据框中有一列月份数字,想将其更改为月份名称,所以我使用了这个: df['monthName'] = df['monthNumber'].apply(lambda x: calendar.mont
Pyspark 中是否有一个 input() 函数,我可以通过它获取控制台输入。如果是,请详细说明一下。 如何在 PySpark 中编写以下代码: directory_change = input("
我们正在 pyspark 中构建数据摄取框架,并想知道处理数据类型异常的最佳方法是什么。基本上,我们希望有一个拒绝表来捕获所有未与架构确认的数据。 stringDf = sparkSession.cr
我正在开发基于一组 ORC 文件的 spark 数据框的 sql 查询。程序是这样的: from pyspark.sql import SparkSession spark_session = Spa
我有一个 Pyspark 数据框( 原始数据框 )具有以下数据(所有列都有 字符串 数据类型): id Value 1 103 2
我有一台配置了Redis和Maven的服务器 然后我执行以下sparkSession spark = pyspark .sql .SparkSession .builder .master('loca
从一些简短的测试来看,pyspark 数据帧的列删除功能似乎不区分大小写,例如。 from pyspark.sql import SparkSession from pyspark.sql.funct
我有: +---+-------+-------+ | id| var1| var2| +---+-------+-------+ | a|[1,2,3]|[1,2,3]| | b|[2,
从一些简短的测试来看,pyspark 数据帧的列删除功能似乎不区分大小写,例如。 from pyspark.sql import SparkSession from pyspark.sql.funct
我有一个带有多个数字列的 pyspark DF,我想为每一列根据每个变量计算该行的十分位数或其他分位数等级。 这对 Pandas 来说很简单,因为我们可以使用 qcut 函数为每个变量创建一个新列,如
我有以下使用 pyspark.ml 包进行线性回归的代码。但是,当模型适合时,我在最后一行收到此错误消息: IllegalArgumentException: u'requirement failed
我有一个由 | 分隔的平面文件(管道),没有引号字符。示例数据如下所示: SOME_NUMBER|SOME_MULTILINE_STRING|SOME_STRING 23|multiline text
给定如下模式: root |-- first_name: string |-- last_name: string |-- degrees: array | |-- element: struc
我有一个 pyspark 数据框如下(这只是一个简化的例子,我的实际数据框有数百列): col1,col2,......,col_with_fix_header 1,2,.......,3 4,5,.
我有一个数据框 +------+--------------------+-----------------+---- | id| titulo |tipo | formac
我从 Spark 数组“df_spark”开始: from pyspark.sql import SparkSession import pandas as pd import numpy as np
如何根据行号/行索引值删除 Pyspark 中的行值? 我是 Pyspark(和编码)的新手——我尝试编码一些东西,但它不起作用。 最佳答案 您不能删除特定的列,但您可以使用 filter 或其别名
我有一个循环生成多个因子表的输出并将列名存储在列表中: | id | f_1a | f_2a | |:---|:----:|:-----| |1 |1.2 |0.95 | |2 |0.7
我正在尝试将 hql 脚本转换为 pyspark。我正在努力如何在 groupby 子句之后的聚合中实现 case when 语句的总和。例如。 dataframe1 = dataframe0.gro
我想添加新的 2 列值服务 arr 第一个和第二个值 但我收到错误: Field name should be String Literal, but it's 0; production_targe
我是一名优秀的程序员,十分优秀!