欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java+spark-sql查询excel

程序员文章站 2022-11-21 21:52:37
Spark官网下载Spark Spark下载,版本随意,下载后解压放入bigdata下(目录可以更改) 下载Windows下Hadoop所需文件winutils.exe 同学们自己网上找找吧,这里就不上传了,其实该文件可有可无,报错也不影响Spark运行,强迫症可以下载,本人就有强迫症~~,文件下载 ......

 spark官网下载spark

spark下载,版本随意,下载后解压放入bigdata下(目录可以更改)

下载windows下hadoop所需文件winutils.exe

  同学们自己网上找找吧,这里就不上传了,其实该文件可有可无,报错也不影响spark运行,强迫症可以下载,本人就有强迫症~~,文件下载后放入bigdata\hadoop\bin目录下。
不用创建环境变量,再java最开始处定义系统变量即可,如下:

system.setproperty("hadoop.home.dir", hadoop_home);

创建java maven项目java-spark-sql-excel

  建立相关目录层次如下:

  父级目录(项目所在目录)
    - java-spark-sql-excel
    - bigdata
      - spark
      - hadoop
        - bin
          - winutils.exe

编码

初始化sparksession

static{
    system.setproperty("hadoop.home.dir", hadoop_home);
    spark = sparksession.builder()
            .appname("test")
            .master("local[*]") 
            .config("spark.sql.warehouse.dir",spark_home)
            .config("spark.sql.parquet.binaryasstring", "true")
            .getorcreate();
     }

读取excel

public static void readexcel(string filepath,string tablename) throws ioexception{
        decimalformat format = new decimalformat(); 
        format.applypattern("#");
        //创建文件(可以接收上传的文件,springmvc使用commonsmultipartfile,jersey可以使用org.glassfish.jersey.media.multipart.formdataparam(参照本人文件上传博客))
        file file = new file(filepath);
        //创建文件流
        inputstream inputstream = new fileinputstream(file);
        //创建流的缓冲区
        bufferedinputstream bufferedinputstream = new bufferedinputstream(inputstream);
        //定义excel workbook引用
        workbook  workbook =null;
        //.xlsx格式的文件使用xssfworkbook子类,xls格式的文件使用hssfworkbook
        if(file.getname().contains("xlsx")) workbook = new xssfworkbook(bufferedinputstream);
        if(file.getname().contains("xls")&&!file.getname().contains("xlsx"))  workbook = new hssfworkbook(bufferedinputstream);
        system.out.println(file.getname());
        //获取sheets迭代器
        iterator<sheet> datatypesheets= workbook.sheetiterator();
        while(datatypesheets.hasnext()){
            //每一个sheet都是一个表,为每个sheet
            arraylist<string> schemalist = new arraylist<string>();
             // datalist数据集
            arraylist<org.apache.spark.sql.row> datalist = new arraylist<org.apache.spark.sql.row>();
            //字段
            list<structfield> fields = new arraylist<>();
            //获取当前sheet
            sheet   datatypesheet = datatypesheets.next();
            //获取第一行作为字段
            iterator<row> iterator = datatypesheet.iterator();
            //没有下一个sheet跳过
            if(!iterator.hasnext()) continue;
            //获取第一行用于建立表结构
            iterator<cell> firstrowcelliterator = iterator.next().iterator();
             while(firstrowcelliterator.hasnext()){
                 //获取第一行每一列作为字段
                 cell currentcell = firstrowcelliterator.next();
                 //字符串
                 if(currentcell.getcelltypeenum() == celltype.string) schemalist.add(currentcell.getstringcellvalue().trim());
                 //数值
                 if(currentcell.getcelltypeenum() == celltype.numeric)  schemalist.add((currentcell.getnumericcellvalue()+"").trim());
             }
             //创建structfield(spark中的字段对象,需要提供字段名,字段类型,第三个参数true表示列可以为空)并填充list<structfield>
             for (string fieldname : schemalist) {
               structfield field = datatypes.createstructfield(fieldname, datatypes.stringtype, true);
               fields.add(field);
             }
             //根据list<structfield>创建spark表结构org.apache.spark.sql.types.structtype
            structtype schema = datatypes.createstructtype(fields);
            //字段数len
            int len = schemalist.size();
            //获取当前sheet数据行数
            int rowend = datatypesheet.getlastrownum(); 
            //遍历当前sheet所有行
            for (int rownum = 1; rownum <= rowend; rownum++) {  
               //一行数据做成一个list
               arraylist<string> rowdatalist = new arraylist<string>();
               //获取一行数据
               row r = datatypesheet.getrow(rownum); 
               if(r!=null){
                   //根据字段数遍历当前行的单元格
                   for (int cn = 0; cn < len; cn++) {  
                      cell c = r.getcell(cn, row.missingcellpolicy.return_blank_as_null);  
                      if (c == null)  rowdatalist.add("0");//空值简单补零
                      if (c != null&&c.getcelltypeenum() == celltype.string)  rowdatalist.add(c.getstringcellvalue().trim());//字符串
                      if (c != null&&c.getcelltypeenum() == celltype.numeric){
                         double value = c.getnumericcellvalue(); 
                         if (p.matcher(value+"").matches())  rowdatalist.add(format.format(value));//不保留小数点
                         if (!p.matcher(value+"").matches()) rowdatalist.add(value+"");//保留小数点
                      }
                      }  
                   }  
                //datalist数据集添加一行
                datalist.add(rowfactory.create(rowdatalist.toarray()));
               }
            //根据数据和表结构创建临时表
            spark.createdataframe(datalist, schema).createorreplacetempview(tablename+datatypesheet.getsheetname());
            }            
    }

在项目目录下创建测试文件

java+spark-sql查询excel

第一个sheet:

java+spark-sql查询excel

第二个sheet:

java+spark-sql查询excel

第三个sheet:

java+spark-sql查询excel

 测试

public static void main(string[] args) throws exception {
        //需要查询的excel路径
        string xlsxpath = "test2.xlsx";
        string xlspath  = "test.xls";
        //定义表名
        string tablename1="test_table1";        
        string tablename2="test_table2";        
        //读取excel表名为tablenamen+sheet的名称
        readexcel(xlsxpath,tablename2);
        spark.sql("select * from "+tablename2+"sheet1").show();
        
        readexcel(xlspath,tablename1);
        spark.sql("select * from "+tablename1+"sheet1").show();
        spark.sql("select * from "+tablename1+"sheet2").show();
        spark.sql("select * from "+tablename1+"sheet3").show();
    }

运行结果

java+spark-sql查询excel

 相关依赖

<dependencies>
     <dependency>
        <groupid>org.spark-project.hive</groupid>
        <artifactid>hive-jdbc</artifactid>
        <version>1.2.1.spark2</version>
     </dependency>
     <dependency>
        <groupid>org.apache.spark</groupid>
        <artifactid>spark-core_2.11</artifactid>
        <version>2.3.1</version>
     </dependency>   
     <dependency>
        <groupid>org.apache.spark</groupid>
        <artifactid>spark-sql_2.11</artifactid>
        <version>2.3.1</version>
     </dependency>    
     <dependency>
        <groupid>org.apache.hadoop</groupid>
        <artifactid>hadoop-client</artifactid>
        <version>2.6.0</version>
     </dependency>
     <dependency>
        <groupid>org.apache.poi</groupid>
        <artifactid>poi</artifactid>
        <version>3.17</version>
     </dependency>
      <dependency>
            <groupid>org.apache.poi</groupid>
            <artifactid>poi-ooxml</artifactid>
            <version>3.17</version>
     </dependency> 
   </dependencies>

本人github