欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

DataFrame:通过SparkSql将scala类转为DataFrame的方法

程序员文章站 2022-10-29 09:01:14
如下所示: import java.text.decimalformat import com.alibaba.fastjson.json import co...

如下所示:

import java.text.decimalformat
import com.alibaba.fastjson.json
import com.donews.data.appconfig
import com.typesafe.config.configfactory
import org.apache.spark.sql.types.{structfield, structtype}
import org.apache.spark.sql.{row, savemode, dataframe, sqlcontext}
import org.apache.spark.{sparkconf, sparkcontext}
import org.slf4j.loggerfactory
 
/**
 * created by silentwolf on 2016/6/3.
 */
 
case class usertag(suuid: string,
     man: float,
     woman: float,
     age10_19: float,
     age20_29: float,
     age30_39: float,
     age40_49: float,
     age50_59: float,
     game: float,
     movie: float,
     music: float,
     art: float,
     politics_news: float,
     financial: float,
     education_training: float,
     health_care: float,
     travel: float,
     automobile: float,
     house_property: float,
     clothing_accessories: float,
     beauty: float,
     it: float,
     baby_product: float,
     food_service: float,
     home_furnishing: float,
     sports: float,
     outdoor_activities: float,
     medicine: float
     )
 
object usertagtable {
 
 val log = loggerfactory.getlogger(useroverviewfirst.getclass)
 
 val rep_home = s"${appconfig.hdfs_master}/${appconfig.hdfs_rep}"
 
 def main(args: array[string]) {
 
 var starttime = system.currenttimemillis()
 
 val conf: com.typesafe.config.config = configfactory.load()
 
 val sc = new sparkcontext()
 
 val sqlcontext = new sqlcontext(sc)
 
 var df1: dataframe = null
 
 if (args.length == 0) {
  println("请输入: appkey , starttime : 2016-04-10 ,startend :2016-04-11")
 }
 else {
 
  var appkey = args(0)
 
  var lastdate = args(1)
 
  df1 = loaddataframe(sqlcontext, appkey, "2016-04-10", lastdate)
 
  df1.registertemptable("suuidtable")
 
  sqlcontext.udf.register("taginfo", (a: string) => usertaginfo(a))
  sqlcontext.udf.register("inttostring", (b: long) => inttostring(b))
  import sqlcontext.implicits._
 
  //***重点***:将临时表中的suuid和自定函数中json数据,放入usertag中。
 sqlcontext.sql(" select distinct(suuid) as suuid,taginfo(suuid) from suuidtable group by suuid").map { case row(suuid: string, taginfo: string) =>
  val taginfoobj = json.parseobject(taginfo)
  usertag(suuid.tostring,
   taginfoobj.getfloat("man"),
   taginfoobj.getfloat("woman"),
   taginfoobj.getfloat("age10_19"),
   taginfoobj.getfloat("age20_29"),
   taginfoobj.getfloat("age30_39"),
   taginfoobj.getfloat("age40_49"),
   taginfoobj.getfloat("age50_59"),
   taginfoobj.getfloat("game"),
   taginfoobj.getfloat("movie"),
   taginfoobj.getfloat("music"),
   taginfoobj.getfloat("art"),
   taginfoobj.getfloat("politics_news"),
   taginfoobj.getfloat("financial"),
   taginfoobj.getfloat("education_training"),
   taginfoobj.getfloat("health_care"),
   taginfoobj.getfloat("travel"),
   taginfoobj.getfloat("automobile"),
   taginfoobj.getfloat("house_property"),
   taginfoobj.getfloat("clothing_accessories"),
   taginfoobj.getfloat("beauty"),
   taginfoobj.getfloat("it"),
   taginfoobj.getfloat("baby_product"),
   taginfoobj.getfloat("food_service"),
   taginfoobj.getfloat("home_furnishing"),
   taginfoobj.getfloat("sports"),
   taginfoobj.getfloat("outdoor_activities"),
   taginfoobj.getfloat("medicine")
  )}.todf().registertemptable("resulttable")
 
  val resultdf = sqlcontext.sql(s"select '$appkey' as appkey, '$lastdate' as date,suuid ,man,woman,age10_19,age20_29,age30_39 ," +
  "age40_49 ,age50_59,game,movie,music,art,politics_news,financial,education_training,health_care,travel,automobile," +
  "house_property,clothing_accessories,beauty,it,baby_product ,food_service ,home_furnishing ,sports ,outdoor_activities ," +
  "medicine from resulttable where suuid is not null")
  resultdf.write.mode(savemode.overwrite).options(
  map("table" -> "user_tags", "zkurl" -> conf.getstring("hbase.url"))
  ).format("org.apache.phoenix.spark").save()
 
 }
 }
 
 def inttostring(suuid: long): string = {
 suuid.tostring()
 }
 
 def usertaginfo(num1: string): string = {
 
 var de = new decimalformat("0.00")
 var mannum = de.format(math.random).tofloat
 var man = mannum
 var woman = de.format(1 - mannum).tofloat
 
 var age10_19num = de.format(math.random * 0.2).tofloat
 var age20_29num = de.format(math.random * 0.2).tofloat
 var age30_39num = de.format(math.random * 0.2).tofloat
 var age40_49num = de.format(math.random * 0.2).tofloat
 
 var age10_19 = age10_19num
 var age20_29 = age20_29num
 var age30_39 = age30_39num
 var age40_49 = age40_49num
 var age50_59 = de.format(1 - age10_19num - age20_29num - age30_39num - age40_49num).tofloat
 
 var game = de.format(math.random * 1).tofloat
 var movie = de.format(math.random * 1).tofloat
 var music = de.format(math.random * 1).tofloat
 var art = de.format(math.random * 1).tofloat
 var politics_news = de.format(math.random * 1).tofloat
 
 var financial = de.format(math.random * 1).tofloat
 var education_training = de.format(math.random * 1).tofloat
 var health_care = de.format(math.random * 1).tofloat
 var travel = de.format(math.random * 1).tofloat
 var automobile = de.format(math.random * 1).tofloat
 
 var house_property = de.format(math.random * 1).tofloat
 var clothing_accessories = de.format(math.random * 1).tofloat
 var beauty = de.format(math.random * 1).tofloat
 var it = de.format(math.random * 1).tofloat
 var baby_product = de.format(math.random * 1).tofloat
 
 var food_service = de.format(math.random * 1).tofloat
 var home_furnishing = de.format(math.random * 1).tofloat
 var sports = de.format(math.random * 1).tofloat
 var outdoor_activities = de.format(math.random * 1).tofloat
 var medicine = de.format(math.random * 1).tofloat
 
 "{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," +
  "\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," +
  "\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," +
  "\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," +
  "\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," +
  "\"beauty\"" + ":" + beauty + "," + "\"it\"" + ":" + it + "," + "\"baby_product\"" + ":" + baby_product + "," + "\"food_service\"" + ":" + food_service + "," +
  "\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine +
  "}";
 
 }
 
 def loaddataframe(ctx: sqlcontext, appkey: string, startday: string, endday: string): dataframe = {
 val path = s"$rep_home/appstatistic"
 ctx.read.parquet(path)
  .filter(s"timestamp is not null and appkey='$appkey' and day>='$startday' and day<='$endday'")
 }
 
 
}

以上这篇dataframe:通过sparksql将scala类转为dataframe的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。