- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章DataFrame:通过SparkSql将scala类转为DataFrame的方法由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
import
java.text.DecimalFormat
import
com.alibaba.fastjson.JSON
import
com.donews.data.AppConfig
import
com.typesafe.config.ConfigFactory
import
org.apache.spark.sql.types.{StructField, StructType}
import
org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext}
import
org.apache.spark.{SparkConf, SparkContext}
import
org.slf4j.LoggerFactory
/
*
*
*
Created by silentwolf on
2016
/
6
/
3.
*
/
case
class
UserTag(SUUID: String,
MAN:
Float
,
WOMAN:
Float
,
AGE10_19:
Float
,
AGE20_29:
Float
,
AGE30_39:
Float
,
AGE40_49:
Float
,
AGE50_59:
Float
,
GAME:
Float
,
MOVIE:
Float
,
MUSIC:
Float
,
ART:
Float
,
POLITICS_NEWS:
Float
,
FINANCIAL:
Float
,
EDUCATION_TRAINING:
Float
,
HEALTH_CARE:
Float
,
TRAVEL:
Float
,
AUTOMOBILE:
Float
,
HOUSE_PROPERTY:
Float
,
CLOTHING_ACCESSORIES:
Float
,
BEAUTY:
Float
,
IT:
Float
,
BABY_PRODUCT:
Float
,
FOOD_SERVICE:
Float
,
HOME_FURNISHING:
Float
,
SPORTS:
Float
,
OUTDOOR_ACTIVITIES:
Float
,
MEDICINE:
Float
)
object
UserTagTable {
val LOG
=
LoggerFactory.getLogger(UserOverviewFirst.getClass)
val REP_HOME
=
s
"${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}"
def
main(args: Array[String]) {
var startTime
=
System.currentTimeMillis()
val conf: com.typesafe.config.Config
=
ConfigFactory.load()
val sc
=
new SparkContext()
val sqlContext
=
new SQLContext(sc)
var df1: DataFrame
=
null
if
(args.length
=
=
0
) {
println(
"请输入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11"
)
}
else
{
var appkey
=
args(
0
)
var lastdate
=
args(
1
)
df1
=
loadDataFrame(sqlContext, appkey,
"2016-04-10"
, lastdate)
df1.registerTempTable(
"suuidTable"
)
sqlContext.udf.register(
"taginfo"
, (a: String)
=
> userTagInfo(a))
sqlContext.udf.register(
"intToString"
, (b:
Long
)
=
> intToString(b))
import
sqlContext.implicits._
/
/
*
*
*
重点
*
*
*
:将临时表中的suuid和自定函数中Json数据,放入UserTag中。
sqlContext.sql(
" select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid"
).
map
{ case Row(suuid: String, taginfo: String)
=
>
val taginfoObj
=
JSON.parseObject(taginfo)
UserTag(suuid.toString,
taginfoObj.getFloat(
"man"
),
taginfoObj.getFloat(
"woman"
),
taginfoObj.getFloat(
"age10_19"
),
taginfoObj.getFloat(
"age20_29"
),
taginfoObj.getFloat(
"age30_39"
),
taginfoObj.getFloat(
"age40_49"
),
taginfoObj.getFloat(
"age50_59"
),
taginfoObj.getFloat(
"game"
),
taginfoObj.getFloat(
"movie"
),
taginfoObj.getFloat(
"music"
),
taginfoObj.getFloat(
"art"
),
taginfoObj.getFloat(
"politics_news"
),
taginfoObj.getFloat(
"financial"
),
taginfoObj.getFloat(
"education_training"
),
taginfoObj.getFloat(
"health_care"
),
taginfoObj.getFloat(
"travel"
),
taginfoObj.getFloat(
"automobile"
),
taginfoObj.getFloat(
"house_property"
),
taginfoObj.getFloat(
"clothing_accessories"
),
taginfoObj.getFloat(
"beauty"
),
taginfoObj.getFloat(
"IT"
),
taginfoObj.getFloat(
"baby_Product"
),
taginfoObj.getFloat(
"food_service"
),
taginfoObj.getFloat(
"home_furnishing"
),
taginfoObj.getFloat(
"sports"
),
taginfoObj.getFloat(
"outdoor_activities"
),
taginfoObj.getFloat(
"medicine"
)
)}.toDF().registerTempTable(
"resultTable"
)
val resultDF
=
sqlContext.sql(s
"select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ,"
+
"AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE,"
+
"HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ,"
+
"MEDICINE from resultTable WHERE SUUID IS NOT NULL"
)
resultDF.write.mode(SaveMode.Overwrite).options(
Map
(
"table"
-
>
"USER_TAGS"
,
"zkUrl"
-
> conf.getString(
"Hbase.url"
))
).
format
(
"org.apache.phoenix.spark"
).save()
}
}
def
intToString(suuid:
Long
): String
=
{
suuid.toString()
}
def
userTagInfo(num1: String): String
=
{
var de
=
new DecimalFormat(
"0.00"
)
var mannum
=
de.
format
(math.random).toFloat
var man
=
mannum
var woman
=
de.
format
(
1
-
mannum).toFloat
var age10_19num
=
de.
format
(math.random
*
0.2
).toFloat
var age20_29num
=
de.
format
(math.random
*
0.2
).toFloat
var age30_39num
=
de.
format
(math.random
*
0.2
).toFloat
var age40_49num
=
de.
format
(math.random
*
0.2
).toFloat
var age10_19
=
age10_19num
var age20_29
=
age20_29num
var age30_39
=
age30_39num
var age40_49
=
age40_49num
var age50_59
=
de.
format
(
1
-
age10_19num
-
age20_29num
-
age30_39num
-
age40_49num).toFloat
var game
=
de.
format
(math.random
*
1
).toFloat
var movie
=
de.
format
(math.random
*
1
).toFloat
var music
=
de.
format
(math.random
*
1
).toFloat
var art
=
de.
format
(math.random
*
1
).toFloat
var politics_news
=
de.
format
(math.random
*
1
).toFloat
var financial
=
de.
format
(math.random
*
1
).toFloat
var education_training
=
de.
format
(math.random
*
1
).toFloat
var health_care
=
de.
format
(math.random
*
1
).toFloat
var travel
=
de.
format
(math.random
*
1
).toFloat
var automobile
=
de.
format
(math.random
*
1
).toFloat
var house_property
=
de.
format
(math.random
*
1
).toFloat
var clothing_accessories
=
de.
format
(math.random
*
1
).toFloat
var beauty
=
de.
format
(math.random
*
1
).toFloat
var IT
=
de.
format
(math.random
*
1
).toFloat
var baby_Product
=
de.
format
(math.random
*
1
).toFloat
var food_service
=
de.
format
(math.random
*
1
).toFloat
var home_furnishing
=
de.
format
(math.random
*
1
).toFloat
var sports
=
de.
format
(math.random
*
1
).toFloat
var outdoor_activities
=
de.
format
(math.random
*
1
).toFloat
var medicine
=
de.
format
(math.random
*
1
).toFloat
"{"
+
"\"man\""
+
":"
+
man
+
","
+
"\"woman\""
+
":"
+
woman
+
","
+
"\"age10_19\""
+
":"
+
age10_19
+
","
+
"\"age20_29\""
+
":"
+
age20_29
+
","
+
"\"age30_39\""
+
":"
+
age30_39
+
","
+
"\"age40_49\""
+
":"
+
age40_49
+
","
+
"\"age50_59\""
+
":"
+
age50_59
+
","
+
"\"game\""
+
":"
+
game
+
","
+
"\"movie\""
+
":"
+
movie
+
","
+
"\"music\""
+
":"
+
music
+
","
+
"\"art\""
+
":"
+
art
+
","
+
"\"politics_news\""
+
":"
+
politics_news
+
","
+
"\"financial\""
+
":"
+
financial
+
","
+
"\"education_training\""
+
":"
+
education_training
+
","
+
"\"health_care\""
+
":"
+
health_care
+
","
+
"\"travel\""
+
":"
+
travel
+
","
+
"\"automobile\""
+
":"
+
automobile
+
","
+
"\"house_property\""
+
":"
+
house_property
+
","
+
"\"clothing_accessories\""
+
":"
+
clothing_accessories
+
","
+
"\"beauty\""
+
":"
+
beauty
+
","
+
"\"IT\""
+
":"
+
IT
+
","
+
"\"baby_Product\""
+
":"
+
baby_Product
+
","
+
"\"food_service\""
+
":"
+
food_service
+
","
+
"\"home_furnishing\""
+
":"
+
home_furnishing
+
","
+
"\"sports\""
+
":"
+
sports
+
","
+
"\"outdoor_activities\""
+
":"
+
outdoor_activities
+
","
+
"\"medicine\""
+
":"
+
medicine
+
"}"
;
}
def
loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame
=
{
val path
=
s
"$REP_HOME/appstatistic"
ctx.read.parquet(path)
.
filter
(s
"timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'"
)
}
}
|
以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我.
原文链接:https://blog.csdn.net/silentwolfyh/article/details/51966952 。
最后此篇关于DataFrame:通过SparkSql将scala类转为DataFrame的方法的文章就讲到这里了,如果你想了解更多关于DataFrame:通过SparkSql将scala类转为DataFrame的方法的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我有一些 Scala 代码,它用两个不同版本的类型参数化函数做了一些漂亮的事情。我已经从我的应用程序中简化了很多,但最后我的代码充满了形式 w(f[Int],f[Double]) 的调用。哪里w()是
如果我在同一目录中有两个单独的未编译的 scala 文件: // hello.scala object hello { def world() = println("hello world") }
val schema = df.schema val x = df.flatMap(r => (0 until schema.length).map { idx => ((idx, r.g
环境: Play 2.3.0/Scala 2.11.1/IntelliJ 13.1 我使用 Typesafe Activator 1.2.1 用 Scala 2.11.1 创建一个新项目。项目创建好后
我只是想知道如何使用我自己的类扩展 Scala 控制台和“脚本”运行程序,以便我可以通过使用实际的 Scala 语言与其通信来实际使用我的代码?我应将 jar 放在哪里,以便无需临时配置即可从每个 S
我已经根据 README.md 文件安装了 ensime,但是,我在低级 ensime-server 缓冲区中出现以下错误: 信息: fatal error :scala.tools.nsc.Miss
我正在阅读《Scala 编程》一书。在书中,它说“一个函数文字被编译成一个类,当在运行时实例化时它是一个函数值”。并且它提到“函数值是对象,因此您可以根据需要将它们存储在变量中”。 所以我尝试检查函数
我有 hello world scala native 应用程序,想对此应用程序运行小型 scala 测试我使用通常的测试命令,但它抛出异常: NativeMain.scala object Nati
有few resources在网络上,在编写与代码模式匹配的 Scala 编译器插件方面很有指导意义,但这些对生成代码(构建符号树)没有帮助。我应该从哪里开始弄清楚如何做到这一点? (如果有比手动构建
我是 Scala 的新手。但是,我用 创建了一个中等大小的程序。斯卡拉 2.9.0 .现在我想使用一个仅适用于 的开源库斯卡拉 2.7.7 . 是吗可能 在我的 Scala 2.9.0 程序中使用这个
有没有办法在 Scala 2.11 中使用 scala-pickling? 我在 sonatype 存储库中尝试了唯一的 scala-pickling_2.11 工件,但它似乎不起作用。我收到消息:
这与命令行编译器选项无关。如何以编程方式获取代码内的 Scala 版本? 或者,Eclipse Scala 插件 v2 在哪里存储 scalac 的路径? 最佳答案 这无需访问 scala-compi
我正在阅读《Scala 编程》一书,并在第 6 章中的类 Rational 实现中遇到了一些问题。 这是我的 Rational 类的初始版本(基于本书) class Rational(numerato
我是 Scala 新手,我正在尝试开发一个使用自定义库的小项目。我在库内创建了一个mysql连接池。这是我的库的build.sbt organization := "com.learn" name :
我正在尝试运行一些 Scala 代码,只是暂时打印出“Hello”,但我希望在 SBT 项目中编译 Scala 代码之前运行 Scala 代码。我发现在 build.sbt 中有以下工作。 compi
Here链接到 maven Scala 插件使用。但没有提到它使用的究竟是什么 Scala 版本。我创建了具有以下配置的 Maven Scala 项目: org.scala-tools
我对 Scala 还很陌生,请多多包涵。我有一堆包裹在一个大数组中的 future 。 future 已经完成了查看几 TB 数据的辛勤工作,在我的应用程序结束时,我想总结上述 future 的所有结
我有一个 scala 宏,它依赖于通过包含其位置的静态字符串指定的任意 xml 文件。 def myMacro(path: String) = macro myMacroImpl def myMacr
这是我的功能: def sumOfSquaresOfOdd(in: Seq[Int]): Int = { in.filter(_%2==1).map(_*_).reduce(_+_) } 为什么我
这个问题在这里已经有了答案: Calculating the difference between two Java date instances (45 个答案) 关闭 5 年前。 所以我有一个这
我是一名优秀的程序员,十分优秀!