欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SparkSQL中DataFrame registerTempTable源码浅析

程序员文章站 2022-07-13 17:07:48
...

dataFrame.registerTempTable(tableName);
  最近在使用SparkSQL时想到1万条数据注册成临时表和1亿条数据注册成临时表时,效率上是否会有很大的差距,也对DataFrame注册成临时表到底做了哪些比较好奇,拿来源码拜读了下相关部分,记录一下。

 

临时表的生命周期是和创建该DataFrame的SQLContext有关系的,SQLContext生命周期结束,该临时表的生命周期也结束了
 

DataFrame.scala相关源码
 /**
   * Registers this [[DataFrame]] as a temporary table using the given name.  The lifetime of this
   * temporary table is tied to the [[SQLContext]] that was used to create this DataFrame.
   *
   * @group basic
   * @since 1.3.0
   */
  def registerTempTable(tableName: String): Unit = {
    sqlContext.registerDataFrameAsTable(this, tableName)
  }

  
 DataFrame中的registerTempTable调用SQLContext中的registerDataFrameAsTable,
 SQLContext中使用SimpleCatalog类去实现Catalog接口中的registerTable方法.
 

SQLContext.scala相关源码
  @transient
  protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)
  /**
   * Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist
   * only during the lifetime of this instance of SQLContext.
   */
  private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
    catalog.registerTable(Seq(tableName), df.logicalPlan)
  }
    
    在SimpleCatalog中定义了Map,registerTable中按tableIdentifier为key,logicalPlan为Value注册到名为tables的map中


  Catalog.scala相关源码
  val tables = new mutable.HashMap[String, LogicalPlan]()
  override def registerTable(
      tableIdentifier: Seq[String],
      plan: LogicalPlan): Unit = {
    val tableIdent = processTableIdentifier(tableIdentifier)
    tables += ((getDbTableName(tableIdent), plan))
  }
 
  protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
    if (conf.caseSensitiveAnalysis) {
      tableIdentifier
    } else {
      tableIdentifier.map(_.toLowerCase)
    }
  }
 
  protected def getDbTableName(tableIdent: Seq[String]): String = {
    val size = tableIdent.size
    if (size <= 2) {
      tableIdent.mkString(".")
    } else {
      tableIdent.slice(size - 2, size).mkString(".")
    }
  }

  阅读以上代码,最终registerTempTable是将表名(或表的标识)和对应的逻辑计划加载到Map中,并随着SQLContext的消亡而消亡