欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java jdbc批量多线程读取CVS文件入库

程序员文章站 2022-06-08 23:04:25
需求是这样的:现在需要测试一个内存数据库的入库性能,要求测试每线程准备一个文件,10个线程入库总计100w记录数的单表入库性能。 知识点:jdbc + 多线程 + 批处理...

需求是这样的:现在需要测试一个内存数据库的入库性能,要求测试每线程准备一个文件,10个线程入库总计100w记录数的单表入库性能。

知识点:jdbc + 多线程 + 批处理 + 文件读取

先来看看我的代码结构

Java jdbc批量多线程读取CVS文件入库

说明:

files: 存放即将要读取的文件。
lib: 存放第三方的jar文件,例如数据库驱动包。
memsqltestmain: 这是工程的入口,就是主程序。
dbutil: 这个类是数据库帮助类,主要读取数据库配置信息获取连接关闭连接等操作。
insertutil: 主要做的是读取数据文件生成sql并批量入库的一个类。
tabledatainfo: 主要对要插入的数据表的对象的一个类。
xmlutil: 读取xml配置文件
config.xml: 配置要插入的表信息以及文件的路径等信息
dbconfig.properties: 主要对数据库的连接信息进行存储,包括url,用户名密码等等。

话不多说直接上代码:

import java.util.arraylist;
/**
 * @param
 * @author wu.lin
 * @description 程序入口,启用线程读取文件并入库
 * @create 2016年09月01日 15:12
 * @throws
 */
public class memsqltestmain {

 public static void main(string[] args) {

  //通过读取配置文件读取要插入数据的表名
  string tablename = xmlutil.gettablename();
  system.out.println(tablename);

  //通过配置文件读取数据存放的文件的路径
  arraylist<string> filenamelist = xmlutil.getfilenamelist();
  int len = filenamelist.size();

  //针对每一个文件开启一个进程去执行读取并入库的操作
  for (int i = 0; i < len; i++) {

   string filename = filenamelist.get(i);
   system.out.println(filename);

   new thread(new insertutil(filename, tablename)).start();
  }

 }
}
import java.io.bufferedreader;
import java.io.file;
import java.io.fileinputstream;
import java.io.inputstreamreader;
import java.sql.*;

/**
 * @param
 * @author wu.lin
 * @description insertutil是一个线程类,主要读取数据文件组装sql并执行入库操作
 * @create 2016年09月01日 14:10
 * @throws 
 */
public class insertutil implements runnable {

 //文件路径
 private string filepath;

 //表名
 private string tablename;

 //.cvs文件数据以","分隔
 private static string delimiters = ",";

 //获取数据库帮助类
 dbutil dbutil = dbutil.getinstance();

 public insertutil() {}

 public insertutil(string filepath, string tablename) {
  this.filepath = filepath;
  this.tablename = tablename;
 }

 public static string getdelimiters() {
  return delimiters;
 }

 public static void setdelimiters(string delimiters) {
  delimiters = delimiters;
 }

 public string getfilepath() {
  return filepath;
 }

 public void setfilepath(string filepath) {
  this.filepath = filepath;
 }


 //读取文件并且批处理入库的方法
 public boolean insertdb(string tablename, long rc, string filepath) {

  if(filepath == null || "".equals(filepath)) {
   system.out.println("文件路径为空");
   return false;
  }
  if (rc < 1) {
   rc = 100;
  }

  connection conn = null;
  boolean flag = false;
  statement pre = null;

  string sql = "";

  tabledatainfo tableinfo = new tabledatainfo();

  try {

   if(conn == null) {
    conn = dbutil.getconnection();
   }

   pre = conn.createstatement();

   conn.setautocommit(false);

   int colcount = tableinfo.gettablecolnums(tablename, conn);

   int rowcount = 0;

   file file = new file(filepath);

   bufferedreader buf = null;
   buf = new bufferedreader(new inputstreamreader(new fileinputstream(file)));

   string line_record = buf.readline();

   long starttime = system.currenttimemillis(); //开始计时

   while (line_record != null) {
    // 解析每一条记录
    sql = "insert into " + tablename + " values('";

    string[] fields = line_record.split(delimiters);

    //对insert语句的合法性进行判断

    if(fields.length != colcount){
     system.out.println("要插入的数据列数和表的数据列不相匹配,停止执行");
     break;
    }


    for (int i = 0; i < fields.length; i++) {

     sql += fields[i];
     if (i < fields.length - 1) {
      sql += "','";
     }
    }

    sql += "');";

    // 在控制台输出sql语句
    // system.out.println(sql);

    //执行sql语句
    pre.addbatch(sql);


    rowcount++;
    line_record = buf.readline();

    if (rowcount >= rc) {
     break;
    }

   }
   pre.executebatch();
   conn.setautocommit(true);

   pre.close();


   system.out.println("共写入行数:" + rowcount);

   long endtime = system.currenttimemillis(); //停止计时

   system.out.println("执行时间为:" + (endtime - starttime) + " ms");

  } catch (exception e) {

   flag = false;
   try {
    //回滚
    if(conn != null) {
     conn.rollback();
    }
   } catch (sqlexception e1) {
    e1.printstacktrace();
   }
   e.printstacktrace();
  } finally {
   dbutil.close(null, pre, conn);
  }
  return flag;
 }

 public void run() {
  this.insertdb(tablename, 500000, filepath);
 }

}
import java.sql.connection;
import java.sql.databasemetadata;
import java.sql.resultset;
import java.sql.sqlexception;

/**
 * @param
 * @author wu.lin
 * @description 数据库表实体
 * @create 2016年09月01日 14:19
 * @throws
 */
public class tabledatainfo {

 dbutil dbutil = dbutil.getinstance();

 /**
  *
  * @param m_tablename
  * @param m_connection
  * @return 该表的列数
  */
 public int gettablecolnums(string m_tablename, connection m_connection) {

  int colcount = 0;

  try {
   if (m_connection == null) {

    m_connection = dbutil.getconnection();
   }

   databasemetadata m_dbmetadata = m_connection.getmetadata();

   resultset tableret = m_dbmetadata.gettables(null, "%", m_tablename,
     new string[] { "table" });

   while (tableret.next()) {
    system.out.println("table name is:"
      + tableret.getstring("table_name"));
   }

   string columnname;
   string columntype;
   resultset colret = m_dbmetadata.getcolumns(null, "%", m_tablename,"%");

   while (colret.next()) {

    columnname = colret.getstring("column_name");
    columntype = colret.getstring("type_name");
    int datasize = colret.getint("column_size");
    int digits = colret.getint("decimal_digits");
    int nullable = colret.getint("nullable");

    string nullflag;

    if (nullable == 1) {
     nullflag = "null";
    } else {
     nullflag = "not null";
    }

    system.out.println(columnname + " " + columntype + "("
      + datasize + "," + digits + ") " + nullflag);
    colcount++;
   }

  } catch (sqlexception e) {
   e.printstacktrace();
  }

  system.out.println("the number of column is: " + colcount);
  return colcount;
 }
}

接下来就剩下读取配置文件的代码了,先来看看配置文件内容(这里配置了数据库配置文件路径表名以及文件存放的相对路径):

<?xml version="1.0" encoding="utf-8" ?>
<config>
 <db_file>src/dbconfig.properties</db_file>
 <tablename>memtest</tablename>
 <files>
  <filepath>files/memtest.csv</filepath>
  <filepath>files/memtest_1.csv</filepath>
  <filepath>files/memtest_2.csv</filepath>
  <filepath>files/memtest_3.csv</filepath>
  <filepath>files/memtest_4.csv</filepath>
  <filepath>files/memtest_5.csv</filepath>
  <filepath>files/memtest_6.csv</filepath>
  <filepath>files/memtest_7.csv</filepath>
  <filepath>files/memtest_8.csv</filepath>
  <filepath>files/memtest_9.csv</filepath>
  <filepath>files/memtest_10.csv</filepath>
 </files>
</config>

接下来是读取这个配置文件的内容,比较简单,所以只贴部分代码:

import javax.xml.parsers.*;
import org.w3c.dom.*;
import java.io.*;
import java.util.arraylist;

/**
 * @param
 * @author wu.lin
 * @description 读取配置信息
 * @create 2016年09月01日 15:45
 * @throws
 */
public class xmlutil {

 //该方法用于从xml配置文件中提取要插入的表名称,并返回该表名称
 public static string gettablename() {

  return getxmlproperties("tablename");
 }

 public static string getdatabaseurl() {
  return getxmlproperties("databaseurl");
 }

 public static string getdbfilepath() {
  return getxmlproperties("db_file");
 }

 private static string getxmlproperties(string proname) {
  try {

   document doc = getdoc();

   //获取包含品牌名称的文本节点
   nodelist nl = doc.getelementsbytagname(proname);
   node classnode=nl.item(0).getfirstchild();
   string tablename=classnode.getnodevalue().trim();

   return tablename;
  } catch(exception e)
  {
   e.printstacktrace();
   return null;
  }
 }

 private static document getdoc() throws exception {
  //创建文档对象
  documentbuilderfactory dfactory = documentbuilderfactory.newinstance();
  documentbuilder builder = dfactory.newdocumentbuilder();
  document doc;
  doc = builder.parse(new file("src/config.xml"));

  return doc;
 }
}

数据库配置信息文档:

db.used=mysql

# driver class
oracle.jdbc.driver_class=oracle.jdbc.driver.oracledriver
# url
oracle.jdbc.url=jdbc:oracle:thin:@localhost:1521:orcl
# username
oracle.jdbc.username=scott
# pwd
oracle.jdbc.pwd=tiger


#mysql connect config
mysql.jdbc.driver_class=com.mysql.jdbc.driver
mysql.jdbc.url=jdbc:mysql://localhost:3306/mysqldb
mysql.jdbc.username=root
mysql.jdbc.pwd=

最后是数据库帮助类,比较常见:

import java.io.fileinputstream;
import java.sql.connection;
import java.sql.drivermanager;
import java.sql.resultset;
import java.sql.sqlexception;
import java.sql.statement;
import java.util.properties;

/**
 * @param
 * @author wu.lin
 * @description 数据库帮助类
 * @create 2016年09月01日 18:56
 * @throws
 */

public class dbutil {

 private static properties env = new properties();

 private static dbutil dbutil;

 private static string dbname;

 private static string driverclass_key;

 private static string url_key;

 private static string username_key;

 private static string pwd_key;


 private dbutil(){}

 // 单例模式
 public static synchronized dbutil getinstance() {
  if (null == dbutil) {
   dbutil = new dbutil();
  }
  return dbutil;
 }

 /**
  * 得到数据库连接
  * @return
  */
 public connection getconnection() {
  connection conn = null;

  try {
   env.load(new fileinputstream(xmlutil.getdbfilepath()));

   dbname = env.getproperty("db.used").tolowercase();

   driverclass_key = dbname + ".jdbc.driver_class";
   url_key = dbname + ".jdbc.url";
   username_key = dbname + ".jdbc.username";
   pwd_key = dbname + ".jdbc.pwd";

   //加载连接数据库的驱动程序类文件
   class.forname(env.getproperty(driverclass_key));
   conn = createconnection();

  } catch (exception e) {

   e.printstacktrace();
  }

  return conn;
 }

 private connection createconnection() throws sqlexception {

  connection conn = null;
  if ("oracle".equals(dbname)) {

   conn = drivermanager.getconnection(env.getproperty(url_key), env.getproperty(username_key),
     env.getproperty(pwd_key));
  }

  if ("sqlserver".equals(dbname)) {

   conn = drivermanager.getconnection(env.getproperty(url_key), env.getproperty(username_key),
     env.getproperty(pwd_key));

  }

  if ("mysql".equals(dbname)) {
   // 其他数据库的连接语法
   string url = env.getproperty(url_key);
   string username = env.getproperty(username_key);
   string pwd = env.getproperty(pwd_key);

   if(username != null && !"".equals(username)) {
    url += ("?user=" + username);
    if(pwd != null && !"".equals(pwd)) {
     url += ("&password=" + pwd);
    }
   }

   conn = drivermanager.getconnection(url);
  }

  return conn;

 }

 //提供jdbc关闭连接的方法
 public void close(resultset rs,statement st,connection conn){

  try {
   if(rs!=null)
    rs.close();
   if(st!=null)
    st.close();
   if(conn!=null)
    conn.close();
  } catch (sqlexception e) {

   e.printstacktrace();
  }
 }
}

最后的工作便是在文件目录存放相应的数据文件,然后通过配置文件配置好文件名、表名以及数据库连接的基本信息后,运行程序入口,便可以将程序跑起来啦。但是在这个过程中也遇到一些小问题,比如,我这边只有一个100w条数据的.csv格式的文件,但是要求读取十个文件,在这个时候我用到了一个小工具:

Java jdbc批量多线程读取CVS文件入库

大家知道.csv格式的文件也可以用excel软件打开,所以在这里转换一下用excel分割器把文件分成十份,就完美的解决问题啦。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。