- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
这是一个新手问题,因为我似乎找不到简单的方法。
我正在使用天气数据处理航空公司数据集,并预测超过 15 分钟的延误。
航空公司数据集(2007 年和 2008 年):http://stat-computing.org/dataexpo/2009/the-data.html
天气:
wget ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2007.csv.gz -O /tmp/weather_2007.csv.gz
wget ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2008.csv.gz -O /tmp/weather_2008.csv.gz
我的代码来自这个 URL https://github.com/neil90/spark_airline_delays/blob/master/spark_airplane.ipynb但我为 Spark 2.3 更改了它:
df_airline_2007 = sqlContext.read.format("csv").option("header", "true").load("/ACMEAirDB/2007/2007.csv")
df_weather_2007 = sqlContext.read.format("csv").option("header", "false").load("/ACMEAirDB/weather_2007/weather_2007.csv")
df_airline_2008 = sqlContext.read.format("csv").option("header", "true").load("/ACMEAirDB/2008/2008.csv")
df_weather_2008 = sqlContext.read.format("csv").option("header", "false").load("/ACMEAirDB/weather_2008/weather_2008.csv")
df_airline_raw = df_airline_2007.unionAll(df_airline_2008)
df_weather_raw = df_weather_2007.unionAll(df_weather_2008)
#Function to create year,month,day into date for airline to join on to weather
def to_date(year,month,day):
dt = "%04d%02d%02d" % (year, month, day)
return dt
sqlContext.udf.register("to_date", to_date)
#Function to discrentize time in airline
def discretize_tod(val):
hour = int(val[:2])
if hour < 8:
return 0
if hour < 16:
return 1
return 2
sqlContext.udf.register("discretize_tod", discretize_tod)
df_airline_raw.registerTempTable("df_airpline_raw")
df_weather_raw.registerTempTable("df_weather_raw")
#Create Final Airline transformation
df_airline = sqlContext.sql("""SELECT
Year as year, Month as month, DayofMonth as day, DayOfWeek as dow,
CarrierDelay as carrier, Origin as origin, Dest as dest, Distance as distance,
discretize_tod(DepTime) as tod, CASE WHEN DepDelay >= 15 THEN 1 ELSE 0 END as delay,
to_date(cast(Year as int), cast(Month as int), cast(DayofMonth as int)) As date
FROM df_airpline_raw
WHERE Cancelled = 0 AND Origin = 'ORD'""")
#Create Base Weather Transformation Table
df_weather = sqlContext.sql("""SELECT
_C0 AS station,
_C1 As date,
_C2 As metric,
_C3 As value,
_C4 As t1,
_C5 As t2,
_C6 As t3,
_C7 As time
FROM df_weather_raw
""")
# df_weather.show(10)
#Create Tmin and Tmax Weather DF
df_weather.registerTempTable("df_weather")
#Create DFs for Weather Tmin and Tmax Values
df_weather_tmin = sqlContext.sql("""SELECT
date,
value as temp_min
FROM df_weather
WHERE station = 'USW00094846'
AND metric = 'TMIN'""")
df_weather_tmax = sqlContext.sql("""SELECT
date,
value as temp_max
FROM df_weather
WHERE station = 'USW00094846'
AND metric = 'TMAX'""")
#Join Airline with Weather Tmin and Tmax Dataframes
df_airline_tmin = df_airline.join(df_weather_tmin,
df_weather_tmin.date == df_airline.date,
"inner").drop(df_weather_tmin.date)
df_airline_tmin_and_tmax = df_airline_tmin.join(df_weather_tmax,
df_weather_tmax.date == df_airline_tmin.date,
"inner").drop(df_weather_tmax.date)
df_airline_tmin_and_tmax.registerTempTable("df_airline_tmin_and_tmax")
df_all = sqlContext.sql("""SELECT
delay,
year,
month,
day,
dow,
cast (tod AS int) tod,
distance,
temp_min,
temp_max
FROM df_airline_tmin_and_tmax""")
#Cache Dataframe because we split it later on
df_all.cache()
#Linear Regression
#import necessary librarys
from pyspark.mllib.regression import LabeledPoint
# from pyspark.mllib.tree import DecisionTree, RandomForest
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.linalg import DenseVector
#Create labeledPoint Parser
def parseDF(row):
values = [row.delay, row.month, row.day, row.dow, row.tod, row.distance, row.temp_min, row.temp_max]
return LabeledPoint(values[0], DenseVector(values[1:]))
#Convert Dataframes to LabeledPoint for modeling
train_data = df_all.filter("year=2007").rdd.map(parseDF)
test_data = df_all.filter("year=2008").rdd.map(parseDF)
#Train Models
modelRF = RandomForest.trainClassifier(train_data, numClasses=2, categoricalFeaturesInfo={},
numTrees=500, impurity='gini', maxDepth=5)
#Apply CART model on Test Data
predictionsRF = modelRF.predict(test_data.map(lambda x: x.features))
predictionsAndLabelsRFRDD = predictionsRF.zip(test_data.map(lambda lp: lp.label))
predictionsAndLabelsRF = predictionsAndLabelsRFRDD.collect()
import pandas as pd
#Create function
def confusion_matrix(predAndLabel):
y_actual = pd.Series([x for x, y in predAndLabel], name = 'Actual')
y_pred = pd.Series([y for x, y in predAndLabel], name = 'Predicted')
matrix = pd.crosstab(y_actual,y_pred)
accuracy = float(matrix[0][0] + matrix[1][1])/(matrix[0][0] + matrix[0][1] + matrix[1][0] + matrix[1][1])
return matrix, accuracy
#RandomForest Confusion Matrix and Model Accuracy
df_confusion_RF, accuracy_RF = confusion_matrix(predictionsAndLabelsRF)
print('RF Confusion Matrix:')
print(df_confusion_RF)
print('\nRF Model Accuracy: {0}'.format(accuracy_RF))
我得到以下正确的输出:
RF Confusion Matrix:
Predicted 0.0 1.0
Actual
0.0 237594 93003
1.0 2300 2433
RF Model Accuracy: 0.715793397549
所以我的问题是:现在我有了模型 predictionsRF
,我该如何应用它,比方说,一个“真实世界”的记录?
这是我的新手尝试:
df_validation = sqlContext.sql("""SELECT
1 delay,
2008 year,
6 month,
19 day,
4 dow,
1 tod,
925 distance,
111 temp_min,
272 temp_max
""")
validation_data = df_validation.rdd.map(parseDF)
df_validation.show(1)
validationsRF = modelRF.predict(validation_data.map(lambda x: x.features))
validationsAndLabelsRFRDD = validationsRF.zip(validation_data.map(lambda lp: lp.label))
validationsAndLabelsRF = validationsAndLabelsRFRDD.collect()
print(validationsRF.collect())
<强>1。我使用 validationsRF.collect()
作为预测延迟结果是否正确?
<强>2。如何从 df_validation
中删除 delay
列而不出现错误(如下)?
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 284.0 failed 4 times, most recent failure: Lost task 0.3 in stage 284.0 (TID 4544, ip-172-31-40-184.us-west-2.compute.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 229, in main
process()
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<stdin>", line 5, in parseDF
File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 1561, in __getattr__
raise AttributeError(item)
AttributeError: delay
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:204)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.GeneratedMethodAccessor184.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 229, in main
process()
File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<stdin>", line 5, in parseDF
File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 1561, in __getattr__
raise AttributeError(item)
最佳答案
How do I remove delay column from df_validation and not getting errors (below)?
不要假设它存在于您的 parseDF
函数中。具体失败原因是:
values = [row.delay, ...]
但老实说,只需切换到 ML Pipeline。
关于apache-spark - 如何实际应用保存的 RF 模型并在 Spark2 中进行预测?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50712956/
我正在通过 labrepl 工作,我看到了一些遵循此模式的代码: ;; Pattern (apply #(apply f %&) coll) ;; Concrete example user=> (a
我从未向应用商店提交过应用,但我会在不久的将来提交。 到目前为止,我对为 iPhone 而非 iPad 进行设计感到很自在。 我了解,通过将通用PAID 应用放到应用商店,客户只需支付一次就可以同时使
我有一个应用程序,它使用不同的 Facebook 应用程序(2 个不同的 AppID)在 Facebook 上发布并显示它是“通过 iPhone”/“通过 iPad”。 当 Facebook 应用程序
我有一个要求,我们必须通过将网站源文件保存在本地 iOS 应用程序中来在 iOS 应用程序 Webview 中运行网站。 Angular 需要服务器来运行应用程序,但由于我们将文件保存在本地,我们无法
所以我有一个单页客户端应用程序。 正常流程: 应用程序 -> OAuth2 服务器 -> 应用程序 我们有自己的 OAuth2 服务器,因此人们可以登录应用程序并获取与用户实体关联的 access_t
假设我有一个安装在用户设备上的 Android 应用程序 A,我的应用程序有一个 AppWidget,我们可以让其他 Android 开发人员在其中以每次安装成本为基础发布他们的应用程序推广广告。因此
Secrets of the JavaScript Ninja中有一个例子它提供了以下代码来绕过 JavaScript 的 Math.min() 函数,该函数需要一个可变长度列表。 Example:
当我分别将数组和对象传递给 function.apply() 时,我得到 NaN 的 o/p,但是当我传递对象和数组时,我得到一个数字。为什么会发生这种情况? 由于数组也被视为对象,为什么我无法使用它
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界. 这篇CFSDN的博客文章ASP转换格林威治时间函数DateDiff()应用由作者收集整理,如果你
我正在将列表传递给 map并且想要返回一个带有合并名称的 data.frame 对象。 例如: library(tidyverse) library(broom) mtcars %>% spl
我有一个非常基本的问题,但我不知道如何实现它:我有一个返回数据框,其中每个工具的返回值是按行排列的: tmp<-as.data.frame(t(data.frame(a=rnorm(250,0,1)
我正在使用我的 FB 应用创建群组并邀请用户加入我的应用群组,第一次一切正常。当我尝试创建另一个组时,出现以下错误: {"(OAuthException - #4009) (#4009) 在有更多用户
我们正在开发一款类似于“会说话的本”应用程序的 child 应用程序。它包含大量用于交互式动画的 JPEG 图像序列。 问题是动画在 iPad Air 上播放正常,但在 iPad 2 上播放缓慢或滞后
我关注 clojure 一段时间了,它的一些功能非常令人兴奋(持久数据结构、函数式方法、不可变状态)。然而,由于我仍在学习,我想了解如何在实际场景中应用,证明其好处,然后演化并应用于更复杂的问题。即,
我开发了一个仅使用挪威语的应用程序。该应用程序不使用本地化,因为它应该仅以一种语言(挪威语)显示。但是,我已在 Info.plist 文件中将“本地化 native 开发区域”设置为“no”。我还使用
读完 Anthony's response 后上a style-related parser question ,我试图说服自己编写单体解析器仍然可以相当紧凑。 所以而不是 reference ::
multicore 库中是否有类似 sapply 的东西?还是我必须 unlist(mclapply(..)) 才能实现这一点? 如果它不存在:推理是什么? 提前致谢,如果这是一个愚蠢的问题,我们深表
我喜欢在窗口中弹出结果,以便更容易查看和查找(例如,它们不会随着控制台继续滚动而丢失)。一种方法是使用 sink() 和 file.show()。例如: y <- rnorm(100); x <- r
我有一个如下所示的 spring mvc Controller @RequestMapping(value="/new", method=RequestMethod.POST) public Stri
我正在阅读 StructureMap关于依赖注入(inject),首先有两部分初始化映射,具体类类型的接口(interface),另一部分只是实例化(请求实例)。 第一部分需要配置和设置,这是在 Bo
我是一名优秀的程序员,十分优秀!