gpt4 book ai didi

DataFrame:通过SparkSql将scala类转为DataFrame的方法

转载 作者:qq735679552 更新时间:2022-09-28 22:32:09 26 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章DataFrame:通过SparkSql将scala类转为DataFrame的方法由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

如下所示:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import java.text.DecimalFormat
import com.alibaba.fastjson.JSON
import com.donews.data.AppConfig
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
 
/ * *
  * Created by silentwolf on 2016 / 6 / 3.
  * /
 
case class UserTag(SUUID: String,
      MAN: Float ,
      WOMAN: Float ,
      AGE10_19: Float ,
      AGE20_29: Float ,
      AGE30_39: Float ,
      AGE40_49: Float ,
      AGE50_59: Float ,
      GAME: Float ,
      MOVIE: Float ,
      MUSIC: Float ,
      ART: Float ,
      POLITICS_NEWS: Float ,
      FINANCIAL: Float ,
      EDUCATION_TRAINING: Float ,
      HEALTH_CARE: Float ,
      TRAVEL: Float ,
      AUTOMOBILE: Float ,
      HOUSE_PROPERTY: Float ,
      CLOTHING_ACCESSORIES: Float ,
      BEAUTY: Float ,
      IT: Float ,
      BABY_PRODUCT: Float ,
      FOOD_SERVICE: Float ,
      HOME_FURNISHING: Float ,
      SPORTS: Float ,
      OUTDOOR_ACTIVITIES: Float ,
      MEDICINE: Float
      )
 
object UserTagTable {
 
  val LOG = LoggerFactory.getLogger(UserOverviewFirst.getClass)
 
  val REP_HOME = s "${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}"
 
  def main(args: Array[String]) {
 
  var startTime = System.currentTimeMillis()
 
  val conf: com.typesafe.config.Config = ConfigFactory.load()
 
  val sc = new SparkContext()
 
  val sqlContext = new SQLContext(sc)
 
  var df1: DataFrame = null
 
  if (args.length = = 0 ) {
   println( "请输入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11" )
  }
  else {
 
   var appkey = args( 0 )
 
   var lastdate = args( 1 )
 
   df1 = loadDataFrame(sqlContext, appkey, "2016-04-10" , lastdate)
 
   df1.registerTempTable( "suuidTable" )
 
   sqlContext.udf.register( "taginfo" , (a: String) = > userTagInfo(a))
   sqlContext.udf.register( "intToString" , (b: Long ) = > intToString(b))
   import sqlContext.implicits._
 
   / / * * * 重点 * * * :将临时表中的suuid和自定函数中Json数据,放入UserTag中。
  sqlContext.sql( " select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid" ). map { case Row(suuid: String, taginfo: String) = >
   val taginfoObj = JSON.parseObject(taginfo)
   UserTag(suuid.toString,
    taginfoObj.getFloat( "man" ),
    taginfoObj.getFloat( "woman" ),
    taginfoObj.getFloat( "age10_19" ),
    taginfoObj.getFloat( "age20_29" ),
    taginfoObj.getFloat( "age30_39" ),
    taginfoObj.getFloat( "age40_49" ),
    taginfoObj.getFloat( "age50_59" ),
    taginfoObj.getFloat( "game" ),
    taginfoObj.getFloat( "movie" ),
    taginfoObj.getFloat( "music" ),
    taginfoObj.getFloat( "art" ),
    taginfoObj.getFloat( "politics_news" ),
    taginfoObj.getFloat( "financial" ),
    taginfoObj.getFloat( "education_training" ),
    taginfoObj.getFloat( "health_care" ),
    taginfoObj.getFloat( "travel" ),
    taginfoObj.getFloat( "automobile" ),
    taginfoObj.getFloat( "house_property" ),
    taginfoObj.getFloat( "clothing_accessories" ),
    taginfoObj.getFloat( "beauty" ),
    taginfoObj.getFloat( "IT" ),
    taginfoObj.getFloat( "baby_Product" ),
    taginfoObj.getFloat( "food_service" ),
    taginfoObj.getFloat( "home_furnishing" ),
    taginfoObj.getFloat( "sports" ),
    taginfoObj.getFloat( "outdoor_activities" ),
    taginfoObj.getFloat( "medicine" )
   )}.toDF().registerTempTable( "resultTable" )
 
   val resultDF = sqlContext.sql(s "select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ," +
   "AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE," +
   "HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ," +
   "MEDICINE from resultTable WHERE SUUID IS NOT NULL" )
   resultDF.write.mode(SaveMode.Overwrite).options(
   Map ( "table" - > "USER_TAGS" , "zkUrl" - > conf.getString( "Hbase.url" ))
   ). format ( "org.apache.phoenix.spark" ).save()
 
  }
  }
 
  def intToString(suuid: Long ): String = {
  suuid.toString()
  }
 
  def userTagInfo(num1: String): String = {
 
  var de = new DecimalFormat( "0.00" )
  var mannum = de. format (math.random).toFloat
  var man = mannum
  var woman = de. format ( 1 - mannum).toFloat
 
  var age10_19num = de. format (math.random * 0.2 ).toFloat
  var age20_29num = de. format (math.random * 0.2 ).toFloat
  var age30_39num = de. format (math.random * 0.2 ).toFloat
  var age40_49num = de. format (math.random * 0.2 ).toFloat
 
  var age10_19 = age10_19num
  var age20_29 = age20_29num
  var age30_39 = age30_39num
  var age40_49 = age40_49num
  var age50_59 = de. format ( 1 - age10_19num - age20_29num - age30_39num - age40_49num).toFloat
 
  var game = de. format (math.random * 1 ).toFloat
  var movie = de. format (math.random * 1 ).toFloat
  var music = de. format (math.random * 1 ).toFloat
  var art = de. format (math.random * 1 ).toFloat
  var politics_news = de. format (math.random * 1 ).toFloat
 
  var financial = de. format (math.random * 1 ).toFloat
  var education_training = de. format (math.random * 1 ).toFloat
  var health_care = de. format (math.random * 1 ).toFloat
  var travel = de. format (math.random * 1 ).toFloat
  var automobile = de. format (math.random * 1 ).toFloat
 
  var house_property = de. format (math.random * 1 ).toFloat
  var clothing_accessories = de. format (math.random * 1 ).toFloat
  var beauty = de. format (math.random * 1 ).toFloat
  var IT = de. format (math.random * 1 ).toFloat
  var baby_Product = de. format (math.random * 1 ).toFloat
 
  var food_service = de. format (math.random * 1 ).toFloat
  var home_furnishing = de. format (math.random * 1 ).toFloat
  var sports = de. format (math.random * 1 ).toFloat
  var outdoor_activities = de. format (math.random * 1 ).toFloat
  var medicine = de. format (math.random * 1 ).toFloat
 
  "{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," +
   "\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," +
   "\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," +
   "\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," +
   "\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," +
   "\"beauty\"" + ":" + beauty + "," + "\"IT\"" + ":" + IT + "," + "\"baby_Product\"" + ":" + baby_Product + "," + "\"food_service\"" + ":" + food_service + "," +
   "\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine +
   "}" ;
 
  }
 
  def loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame = {
  val path = s "$REP_HOME/appstatistic"
  ctx.read.parquet(path)
   . filter (s "timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'" )
  }
 
 
}

以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我.

原文链接:https://blog.csdn.net/silentwolfyh/article/details/51966952 。

最后此篇关于DataFrame:通过SparkSql将scala类转为DataFrame的方法的文章就讲到这里了,如果你想了解更多关于DataFrame:通过SparkSql将scala类转为DataFrame的方法的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com