欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark学习(12)之流式计算SparkStreaming中的sparkSql

程序员文章站 2022-07-06 14:15:10
...

我们可以使用sparkSql来计算实时流中离散的RDD,通过把DStream转化成DF, 然后在创建视图,再使用SparkSession执行标准的sql语句就可以。通过下面代码,我们可以发现这里创建DataFrame和以前的稍微有点差别,通过直接给DF指定字段来创建DataFrame的结构,这个指定的字段要和RDD中的结构对应

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSession

object MyNetWorkWordCountWithSql {
  def main(args: Array[String]): Unit = {
    //不打印多余的日志配置
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //获取StreamingContext对象
    val conf=new SparkConf().setAppName("StreamTest2").setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(2))
    //得到DStream,是一个scoket输入流                  ip               端口                     
    val dstream = ssc.socketTextStream("192.168.112.111", 1234, StorageLevel.MEMORY_AND_DISK_SER)
    val pairRdd=dstream.flatMap(_.split(" "))
    pairRdd.foreachRDD(rdd=>{
      //创建sparkSession对象,用来导入隐式转换的,也用它执行sparkSql的标准sql语句
      val spark=SparkSession.builder().appName("SparSqlDemo1").master("local").getOrCreate()
      import spark.implicits._
      //直接通通过word字段创建df
      val df=rdd.toDF("word")
      df.printSchema()
      df.createOrReplaceTempView("mywords")
      spark.sql("select word,count(*) from mywords group by word").show()
    })
    
    //启动流式计算
    ssc.start()
    ssc.awaitTermination()
  }
}

第二种写法,我们稍微通过Reduce计算一下再保存在表中

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSession

object MyNetWorkWordCountWithSql {
  def main(args: Array[String]): Unit = {
    //不打印多余的日志配置
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //获取StreamingContext对象
    val conf=new SparkConf().setAppName("StreamTest2").setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(2))
    //得到DStream,是一个scoket输入流                  ip               端口                     
    val dstream = ssc.socketTextStream("192.168.112.111", 1234, StorageLevel.MEMORY_AND_DISK_SER)
    val pairRdd=dstream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    pairRdd.foreachRDD(rdd=>{
      //创建sparkSession对象,用来导入隐式转换的,也用它执行sparkSql的标准sql语句
      val spark=SparkSession.builder().appName("SparSqlDemo1").master("local").getOrCreate()
      import spark.implicits._
      //直接通通过word字段创建df
      val df=rdd.toDF("word","num")
      df.printSchema()
      df.createOrReplaceTempView("mywords")
      //spark.sql("select word,count(*) from mywords group by word").show()
      spark.sql("select word,num from mywords").show()
    })
    
    //启动流式计算
    ssc.start()
    ssc.awaitTermination()
  }
}