欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Mybaits 源码解析 (七)----- Select 语句的执行过程分析(下篇)全网最详细,没有之一

程序员文章站 2022-04-10 14:35:03
我们上篇文章讲到了查询方法里面的doQuery方法,这里面就是调用JDBC的API了,其中的逻辑比较复杂,我们这边文章来讲,先看看我们上篇文章分析的地方 SimpleExecutor 1 public List doQuery(MappedStatement ms, Object pa ......

我们上篇文章讲到了查询方法里面的doquery方法,这里面就是调用jdbc的api了,其中的逻辑比较复杂,我们这边文章来讲,先看看我们上篇文章分析的地方

simpleexecutor

 1 public <e> list<e> doquery(mappedstatement ms, object parameter, rowbounds rowbounds, resulthandler resulthandler, boundsql boundsql) throws sqlexception {
 2     statement stmt = null;
 3     try {
 4         configuration configuration = ms.getconfiguration();
 5         // 创建 statementhandler
 6         statementhandler handler = configuration.newstatementhandler(wrapper, ms, parameter, rowbounds, resulthandler, boundsql);
 7         // 创建 statement
 8         stmt = preparestatement(handler, ms.getstatementlog());
 9         // 执行查询操作
10         return handler.<e>query(stmt, resulthandler);
11     } finally {
12         // 关闭 statement
13         closestatement(stmt);
14     }
15 }

上篇文章我们分析完了第6行代码,在第6行处我们创建了一个preparedstatementhandler,我们要接着第8行代码开始分析,也就是创建 statement,先不忙着分析,我们先来回顾一下 ,我们以前是怎么使用jdbc的

jdbc

public class login {
    /**
     *    第一步,加载驱动,创建数据库的连接
     *    第二步,编写sql
     *    第三步,需要对sql进行预编译
     *    第四步,向sql里面设置参数
     *    第五步,执行sql
     *    第六步,释放资源 
     * @throws exception 
     */
     
    public static final string url = "jdbc:mysql://localhost:3306/chenhao";
    public static final string user = "liulx";
    public static final string password = "123456";
    public static void main(string[] args) throws exception {
        login("lucy","123");
    }
    
    public static void login(string username , string password) throws exception{
        connection conn = null; 
        preparedstatement psmt = null;
        resultset rs = null;
        try {
            //加载驱动程序
            class.forname("com.mysql.jdbc.driver");
            //获得数据库连接
            conn = drivermanager.getconnection(url, user, password);
            //编写sql
            string sql = "select * from user where name =? and password = ?";//问号相当于一个占位符
            //对sql进行预编译
            psmt = conn.preparestatement(sql);
            //设置参数
            psmt.setstring(1, username);
            psmt.setstring(2, password);
            //执行sql ,返回一个结果集
            rs = psmt.executequery();
            //输出结果
            while(rs.next()){
                system.out.println(rs.getstring("user_name")+" 年龄:"+rs.getint("age"));
            }
        } catch (exception e) {
            e.printstacktrace();
        }finally{
            //释放资源
            conn.close();
            psmt.close();
            rs.close();
        }
    }
}

上面代码中注释已经很清楚了,我们来看看mybatis中是怎么和数据库打交道的。

simpleexecutor

private statement preparestatement(statementhandler handler, log statementlog) throws sqlexception {
    statement stmt;
    // 获取数据库连接
    connection connection = getconnection(statementlog);
   // 创建 statement,
    stmt = handler.prepare(connection, transaction.gettimeout());
   // 为 statement 设置参数
    handler.parameterize(stmt);
    return stmt;
}

在上面的代码中我们终于看到了和jdbc相关的内容了,大概分为下面三个步骤:

  1. 获取数据库连接
  2. 创建preparedstatement
  3. 为preparedstatement设置运行时参数

我们先来看看获取数据库连接,跟进代码看看

baseexecutor

protected connection getconnection(log statementlog) throws sqlexception {
    //通过transaction来获取connection
    connection connection = this.transaction.getconnection();
    return statementlog.isdebugenabled() ? connectionlogger.newinstance(connection, statementlog, this.querystack) : connection;
}

我们看到是通过executor中的transaction属性来获取connection,那我们就先来看看transaction,根据前面的文章中的配置 <transactionmanager type="jdbc"/>,则mybatis会创建一个jdbctransactionfactory.class 实例,executor中的transaction是一个jdbctransaction.class 实例,其实现transaction接口,那我们先来看看transaction

jdbctransaction

我们先来看看其接口transaction

transaction

public interface transaction {
    //获取数据库连接
    connection getconnection() throws sqlexception;
    //提交事务
    void commit() throws sqlexception;
    //回滚事务
    void rollback() throws sqlexception;
    //关闭事务
    void close() throws sqlexception;
    //获取超时时间
    integer gettimeout() throws sqlexception;
}

接着我们看看其实现类jdbctransaction

jdbctransaction

public class jdbctransaction implements transaction {
  
  private static final log log = logfactory.getlog(jdbctransaction.class);
  
  //数据库连接
  protected connection connection;
  //数据源信息
  protected datasource datasource;
  //隔离级别
  protected transactionisolationlevel level;
  //是否为自动提交
  protected boolean autocommmit;
  
  public jdbctransaction(datasource ds, transactionisolationlevel desiredlevel, boolean desiredautocommit) {
    datasource = ds;
    level = desiredlevel;
    autocommmit = desiredautocommit;
  }
  
  public jdbctransaction(connection connection) {
    this.connection = connection;
  }
  
  public connection getconnection() throws sqlexception {
    //如果事务中不存在connection,则获取一个connection并放入connection属性中
    //第一次肯定为空
    if (connection == null) {
      openconnection();
    }
    //如果事务中已经存在connection,则直接返回这个connection
    return connection;
  }
  
    /**
     * commit()功能 
     * @throws sqlexception
     */
  public void commit() throws sqlexception {
    if (connection != null && !connection.getautocommit()) {
      if (log.isdebugenabled()) {
        log.debug("committing jdbc connection [" + connection + "]");
      }
      //使用connection的commit()
      connection.commit();
    }
  }
  
    /**
     * rollback()功能 
     * @throws sqlexception
     */
  public void rollback() throws sqlexception {
    if (connection != null && !connection.getautocommit()) {
      if (log.isdebugenabled()) {
        log.debug("rolling back jdbc connection [" + connection + "]");
      }
      //使用connection的rollback()
      connection.rollback();
    }
  }
  
    /**
     * close()功能 
     * @throws sqlexception
     */
  public void close() throws sqlexception {
    if (connection != null) {
      resetautocommit();
      if (log.isdebugenabled()) {
        log.debug("closing jdbc connection [" + connection + "]");
      }
      //使用connection的close()
      connection.close();
    }
  }
  
  protected void openconnection() throws sqlexception {
    if (log.isdebugenabled()) {
      log.debug("opening jdbc connection");
    }
    //通过datasource来获取connection,并设置到transaction的connection属性中
    connection = datasource.getconnection();
   if (level != null) {
      //通过connection设置事务的隔离级别
      connection.settransactionisolation(level.getlevel());
    }
    //设置事务是否自动提交
    setdesiredautocommit(autocommmit);
  }
  
  protected void setdesiredautocommit(boolean desiredautocommit) {
    try {
        if (this.connection.getautocommit() != desiredautocommit) {
            if (log.isdebugenabled()) {
                log.debug("setting autocommit to " + desiredautocommit + " on jdbc connection [" + this.connection + "]");
            }
            //通过connection设置事务是否自动提交
            this.connection.setautocommit(desiredautocommit);
        }

    } catch (sqlexception var3) {
        throw new transactionexception("error configuring autocommit.  your driver may not support getautocommit() or setautocommit(). requested setting: " + desiredautocommit + ".  cause: " + var3, var3);
    }
  }
  
}

我们看到jdbctransaction中有一个connection属性和datasource属性,使用connection来进行提交、回滚、关闭等操作,也就是说jdbctransaction其实只是在jdbc的connection上面封装了一下,实际使用的其实还是jdbc的事务。我们看看getconnection()方法

//数据库连接
protected connection connection;
//数据源信息
protected datasource datasource;

public connection getconnection() throws sqlexception {
//如果事务中不存在connection,则获取一个connection并放入connection属性中
//第一次肯定为空
if (connection == null) {
  openconnection();
}
//如果事务中已经存在connection,则直接返回这个connection
return connection;
}

protected void openconnection() throws sqlexception {
if (log.isdebugenabled()) {
  log.debug("opening jdbc connection");
}
//通过datasource来获取connection,并设置到transaction的connection属性中
connection = datasource.getconnection();
if (level != null) {
  //通过connection设置事务的隔离级别
  connection.settransactionisolation(level.getlevel());
}
//设置事务是否自动提交
setdesiredautocommit(autocommmit);
}

先是判断当前事务中是否存在connection,如果存在,则直接返回connection,如果不存在则通过datasource来获取connection,这里我们明白了一点,如果当前事务没有关闭,也就是没有释放connection,那么在同一个transaction中使用的是同一个connection,我们再来想想,transaction是simpleexecutor中的属性,simpleexecutor又是sqlsession中的属性,那我们可以这样说,同一个sqlsession中只有一个simpleexecutor,simpleexecutor中有一个transaction,transaction有一个connection。我们来看看如下例子

public static void main(string[] args) throws ioexception {
    string resource = "mybatis-config.xml";
    inputstream inputstream = resources.getresourceasstream(resource);
    sqlsessionfactory sqlsessionfactory = new sqlsessionfactorybuilder().build(inputstream);
    //创建一个sqlsession
    sqlsession sqlsession = sqlsessionfactory.opensession();
    try {
         employeemapper employeemapper = sqlsession.getmapper(employee.class);
         usermapper usermapper = sqlsession.getmapper(user.class);
         list<employee> allemployee = employeemapper.getall();
         list<user> alluser = usermapper.getall();
         employee employee = employeemapper.getone();
    } finally {
        sqlsession.close();
    }
}

我们看到同一个sqlsession可以获取多个mapper代理对象,则多个mapper代理对象中的sqlsession引用应该是同一个,那么多个mapper代理对象调用方法应该是同一个connection,直到调用close(),所以说我们的sqlsession是线程不安全的,如果所有的业务都使用一个sqlsession,那connection也是同一个,一个业务执行完了就将其关闭,那其他的业务还没执行完呢。大家明白了吗?我们回归到源码,connection = datasource.getconnection();,最终还是调用datasource来获取连接,那我们是不是要来看看datasource呢?

我们还是从前面的配置文件来看<datasource type="unpooled|pooled">,这里有unpooled和pooled两种datasource,一种是使用连接池,一种是普通的datasource,unpooled将会创将new unpooleddatasource()实例,pooled将会new pooleddatasource()实例,都实现datasource接口,那我们先来看看datasource接口

datasource

public interface datasource  extends commondatasource,wrapper {
  //获取数据库连接
  connection getconnection() throws sqlexception;

  connection getconnection(string username, string password)
    throws sqlexception;

}

很简单,只有一个获取数据库连接的接口,那我们来看看其实现类

unpooleddatasource

unpooleddatasource,从名称上即可知道,该种数据源不具有池化特性。该种数据源每次会返回一个新的数据库连接,而非复用旧的连接。其核心的方法有三个,分别如下:

  1. initializedriver - 初始化数据库驱动
  2. dogetconnection - 获取数据连接
  3. configureconnection - 配置数据库连接

初始化数据库驱动

看下我们上面使用jdbc的例子,在执行 sql 之前,通常都是先获取数据库连接。一般步骤都是加载数据库驱动,然后通过 drivermanager 获取数据库连接。unpooleddatasource 也是使用 jdbc 访问数据库的,因此它获取数据库连接的过程一样

unpooleddatasource

public class unpooleddatasource implements datasource {
    private classloader driverclassloader;
    private properties driverproperties;
    private static map<string, driver> registereddrivers = new concurrenthashmap();
    private string driver;
    private string url;
    private string username;
    private string password;
    private boolean autocommit;
    private integer defaulttransactionisolationlevel;

    public unpooleddatasource() {
    }

    public unpooleddatasource(string driver, string url, string username, string password) {
        this.driver = driver;
        this.url = url;
        this.username = username;
        this.password = password;
    }

    private synchronized void initializedriver() throws sqlexception {
        // 检测当前 driver 对应的驱动实例是否已经注册
        if (!registereddrivers.containskey(driver)) {
            class<?> drivertype;
            try {
                // 加载驱动类型
                if (driverclassloader != null) {
                    // 使用 driverclassloader 加载驱动
                    drivertype = class.forname(driver, true, driverclassloader);
                } else {
                    // 通过其他 classloader 加载驱动
                    drivertype = resources.classforname(driver);
                }

                // 通过反射创建驱动实例
                driver driverinstance = (driver) drivertype.newinstance();
                /*
                 * 注册驱动,注意这里是将 driver 代理类 driverproxy 对象注册到 drivermanager 中的,而非 driver 对象本身。
                 */
                drivermanager.registerdriver(new driverproxy(driverinstance));
                // 缓存驱动类名和实例,防止多次注册
                registereddrivers.put(driver, driverinstance);
            } catch (exception e) {
                throw new sqlexception("error setting driver on unpooleddatasource. cause: " + e);
            }
        }
    }
    //略...
}

//drivermanager
private final static copyonwritearraylist<driverinfo> registereddrivers = new copyonwritearraylist<driverinfo>();
public static synchronized void registerdriver(java.sql.driver driver)
    throws sqlexception {

    if(driver != null) {
        registereddrivers.addifabsent(new driverinfo(driver));
    } else {
        // this is for compatibility with the original drivermanager
        throw new nullpointerexception();
    }
}
通过反射机制加载驱动driver,并将其注册到drivermanager中的一个常量集合中,供后面获取连接时使用,为什么这里是一个list呢?我们实际开发中有可能使用到了多种数据库类型,如mysql、oracle等,其驱动都是不同的,不同的数据源获取连接时使用的是不同的驱动。
在我们使用jdbc的时候,也没有通过drivermanager.registerdriver(new driverproxy(driverinstance));去注册driver啊,如果我们使用的是mysql数据源,那我们来看class.forname("com.mysql.jdbc.driver");这句代码发生了什么
class.forname主要是做了什么呢?它主要是要求jvm查找并装载指定的类。这样我们的类com.mysql.jdbc.driver就被装载进来了。而且在类被装载进jvm的时候,它的静态方法就会被执行。我们来看com.mysql.jdbc.driver的实现代码。在它的实现里有这么一段代码:
static {  
    try {  
        java.sql.drivermanager.registerdriver(new driver());  
    } catch (sqlexception e) {  
        throw new runtimeexception("can't register driver!");  
    }  
}

 很明显,这里使用了drivermanager并将该类给注册上去了。所以,对于任何实现前面driver接口的类,只要在他们被装载进jvm的时候注册drivermanager就可以实现被后续程序使用。

作为那些被加载的driver实现,他们本身在被装载时会在执行的static代码段里通过调用drivermanager.registerdriver()来把自身注册到drivermanager的registereddrivers列表中。这样后面就可以通过得到的driver来取得连接了。

获取数据库连接

在上面例子中使用 jdbc 时,我们都是通过 drivermanager 的接口方法获取数据库连接。我们来看看unpooleddatasource是如何获取的。

unpooleddatasource

public connection getconnection() throws sqlexception {
    return dogetconnection(username, password);
}
    
private connection dogetconnection(string username, string password) throws sqlexception {
    properties props = new properties();
    if (driverproperties != null) {
        props.putall(driverproperties);
    }
    if (username != null) {
        // 存储 user 配置
        props.setproperty("user", username);
    }
    if (password != null) {
        // 存储 password 配置
        props.setproperty("password", password);
    }
    // 调用重载方法
    return dogetconnection(props);
}

private connection dogetconnection(properties properties) throws sqlexception {
    // 初始化驱动,我们上一节已经讲过了,只用初始化一次
    initializedriver();
    // 获取连接
    connection connection = drivermanager.getconnection(url, properties);
    // 配置连接,包括自动提交以及事务等级
    configureconnection(connection);
    return connection;
}

private void configureconnection(connection conn) throws sqlexception {
    if (autocommit != null && autocommit != conn.getautocommit()) {
        // 设置自动提交
        conn.setautocommit(autocommit);
    }
    if (defaulttransactionisolationlevel != null) {
        // 设置事务隔离级别
        conn.settransactionisolation(defaulttransactionisolationlevel);
    }
}

上面方法将一些配置信息放入到 properties 对象中,然后将数据库连接和 properties 对象传给 drivermanager 的 getconnection 方法即可获取到数据库连接。我们来看看是怎么获取数据库连接的

private static connection getconnection(string url, java.util.properties info, class<?> caller) throws sqlexception {
    // 获取类加载器
    classloader callercl = caller != null ? caller.getclassloader() : null;
    synchronized(drivermanager.class) {
      if (callercl == null) {
        callercl = thread.currentthread().getcontextclassloader();
      }
    }
    // 此处省略部分代码 
    // 这里遍历的是在registerdriver(driver driver)方法中注册的驱动对象
    // 每个driverinfo包含了驱动对象和其信息
    for(driverinfo adriver : registereddrivers) {

      // 判断是否为当前线程类加载器加载的驱动类
      if(isdriverallowed(adriver.driver, callercl)) {
        try {
          println("trying " + adriver.driver.getclass().getname());

          // 获取连接对象,这里调用了driver的父类的方法
          // 如果这里有多个driverinfo,比喻mysql和oracle的driver都注册registereddrivers了
          // 这里所有的driver都会尝试使用url和info去连接,哪个连接上了就返回
          // 会不会所有的都会连接上呢?不会,因为url的写法不同,不同的driver会判断url是否适合当前驱动
          connection con = adriver.driver.connect(url, info);
          if (con != null) {
            // 打印连接成功信息
            println("getconnection returning " + adriver.driver.getclass().getname());
            // 返回连接对像
            return (con);
          }
        } catch (sqlexception ex) {
          if (reason == null) {
            reason = ex;
          }
        }
      } else {
        println("    skipping: " + adriver.getclass().getname());
      }
    }  
}

代码中循环所有注册的驱动,然后通过驱动进行连接,所有的驱动都会尝试连接,但是不同的驱动,连接的url是不同的,如mysql的url是jdbc:mysql://localhost:3306/chenhao,以jdbc:mysql://开头,则其mysql的驱动肯定会判断获取连接的url符合,oracle的也类似,我们来看看mysql的驱动获取连接

Mybaits 源码解析 (七)----- Select 语句的执行过程分析(下篇)全网最详细,没有之一

由于篇幅原因,我这里就不分析了,大家有兴趣的可以看看,最后由url对应的驱动获取到connection返回,好了我们再来看看下一种datasource

pooleddatasource

pooleddatasource 内部实现了连接池功能,用于复用数据库连接。因此,从效率上来说,pooleddatasource 要高于 unpooleddatasource。但是最终获取connection还是通过unpooleddatasource,只不过pooleddatasource 提供一个存储connection的功能。

辅助类介绍

pooleddatasource 需要借助两个辅助类帮其完成功能,这两个辅助类分别是 poolstate 和 pooledconnection。poolstate 用于记录连接池运行时的状态,比如连接获取次数,无效连接数量等。同时 poolstate 内部定义了两个 pooledconnection 集合,用于存储空闲连接和活跃连接。pooledconnection 内部定义了一个 connection 类型的变量,用于指向真实的数据库连接。以及一个 connection 的代理类,用于对部分方法调用进行拦截。至于为什么要拦截,随后将进行分析。除此之外,pooledconnection 内部也定义了一些字段,用于记录数据库连接的一些运行时状态。接下来,我们来看一下 pooledconnection 的定义。

pooledconnection

class pooledconnection implements invocationhandler {

    private static final string close = "close";
    private static final class<?>[] ifaces = new class<?>[]{connection.class};

    private final int hashcode;
    private final pooleddatasource datasource;
    // 真实的数据库连接
    private final connection realconnection;
    // 数据库连接代理
    private final connection proxyconnection;
    
    // 从连接池中取出连接时的时间戳
    private long checkouttimestamp;
    // 数据库连接创建时间
    private long createdtimestamp;
    // 数据库连接最后使用时间
    private long lastusedtimestamp;
    // connectiontypecode = (url + username + password).hashcode()
    private int connectiontypecode;
    // 表示连接是否有效
    private boolean valid;

    public pooledconnection(connection connection, pooleddatasource datasource) {
        this.hashcode = connection.hashcode();
        this.realconnection = connection;
        this.datasource = datasource;
        this.createdtimestamp = system.currenttimemillis();
        this.lastusedtimestamp = system.currenttimemillis();
        this.valid = true;
        // 创建 connection 的代理类对象
        this.proxyconnection = (connection) proxy.newproxyinstance(connection.class.getclassloader(), ifaces, this);
    }
    
    @override
    public object invoke(object proxy, method method, object[] args) throws throwable {...}
    
    // 省略部分代码
}

下面再来看看 poolstate 的定义。

poolstate 

public class poolstate {

    protected pooleddatasource datasource;

    // 空闲连接列表
    protected final list<pooledconnection> idleconnections = new arraylist<pooledconnection>();
    // 活跃连接列表
    protected final list<pooledconnection> activeconnections = new arraylist<pooledconnection>();
    // 从连接池中获取连接的次数
    protected long requestcount = 0;
    // 请求连接总耗时(单位:毫秒)
    protected long accumulatedrequesttime = 0;
    // 连接执行时间总耗时
    protected long accumulatedcheckouttime = 0;
    // 执行时间超时的连接数
    protected long claimedoverdueconnectioncount = 0;
    // 超时时间累加值
    protected long accumulatedcheckouttimeofoverdueconnections = 0;
    // 等待时间累加值
    protected long accumulatedwaittime = 0;
    // 等待次数
    protected long hadtowaitcount = 0;
    // 无效连接数
    protected long badconnectioncount = 0;
}

大家记住上面的空闲连接列表和活跃连接列表

获取连接

前面已经说过,pooleddatasource 会将用过的连接进行回收,以便可以复用连接。因此从 pooleddatasource 获取连接时,如果空闲链接列表里有连接时,可直接取用。那如果没有空闲连接怎么办呢?此时有两种解决办法,要么创建新连接,要么等待其他连接完成任务。

pooleddatasource

public class pooleddatasource implements datasource {
    private static final log log = logfactory.getlog(pooleddatasource.class);
    //这里有辅助类poolstate
    private final poolstate state = new poolstate(this);
    //还有一个unpooleddatasource属性,其实真正获取connection是由unpooleddatasource来完成的
    private final unpooleddatasource datasource;
    protected int poolmaximumactiveconnections = 10;
    protected int poolmaximumidleconnections = 5;
    protected int poolmaximumcheckouttime = 20000;
    protected int pooltimetowait = 20000;
    protected string poolpingquery = "no ping query set";
    protected boolean poolpingenabled = false;
    protected int poolpingconnectionsnotusedfor = 0;
    private int expectedconnectiontypecode;
    
    public pooleddatasource() {
        this.datasource = new unpooleddatasource();
    }
    
    public pooleddatasource(string driver, string url, string username, string password) {
        //构造器中创建unpooleddatasource对象
        this.datasource = new unpooleddatasource(driver, url, username, password);
    }
    
    public connection getconnection() throws sqlexception {
        return this.popconnection(this.datasource.getusername(), this.datasource.getpassword()).getproxyconnection();
    }
    
    private pooledconnection popconnection(string username, string password) throws sqlexception {
        boolean countedwait = false;
        pooledconnection conn = null;
        long t = system.currenttimemillis();
        int localbadconnectioncount = 0;

        while (conn == null) {
            synchronized (state) {
                // 检测空闲连接集合(idleconnections)是否为空
                if (!state.idleconnections.isempty()) {
                    // idleconnections 不为空,表示有空闲连接可以使用,直接从空闲连接集合中取出一个连接
                    conn = state.idleconnections.remove(0);
                } else {
                    /*
                     * 暂无空闲连接可用,但如果活跃连接数还未超出限制
                     *(poolmaximumactiveconnections),则可创建新的连接
                     */
                    if (state.activeconnections.size() < poolmaximumactiveconnections) {
                        // 创建新连接,看到没,还是通过datasource获取连接,也就是unpooleddatasource获取连接
                        conn = new pooledconnection(datasource.getconnection(), this);
                    } else {    // 连接池已满,不能创建新连接
                        // 取出运行时间最长的连接
                        pooledconnection oldestactiveconnection = state.activeconnections.get(0);
                        // 获取运行时长
                        long longestcheckouttime = oldestactiveconnection.getcheckouttime();
                        // 检测运行时长是否超出限制,即超时
                        if (longestcheckouttime > poolmaximumcheckouttime) {
                            // 累加超时相关的统计字段
                            state.claimedoverdueconnectioncount++;
                            state.accumulatedcheckouttimeofoverdueconnections += longestcheckouttime;
                            state.accumulatedcheckouttime += longestcheckouttime;

                            // 从活跃连接集合中移除超时连接
                            state.activeconnections.remove(oldestactiveconnection);
                            // 若连接未设置自动提交,此处进行回滚操作
                            if (!oldestactiveconnection.getrealconnection().getautocommit()) {
                                try {
                                    oldestactiveconnection.getrealconnection().rollback();
                                } catch (sqlexception e) {...}
                            }
                            /*
                             * 创建一个新的 pooledconnection,注意,
                             * 此处复用 oldestactiveconnection 的 realconnection 变量
                             */
                            conn = new pooledconnection(oldestactiveconnection.getrealconnection(), this);
                            /*
                             * 复用 oldestactiveconnection 的一些信息,注意 pooledconnection 中的 
                             * createdtimestamp 用于记录 connection 的创建时间,而非 pooledconnection 
                             * 的创建时间。所以这里要复用原连接的时间信息。
                             */
                            conn.setcreatedtimestamp(oldestactiveconnection.getcreatedtimestamp());
                            conn.setlastusedtimestamp(oldestactiveconnection.getlastusedtimestamp());

                            // 设置连接为无效状态
                            oldestactiveconnection.invalidate();
                            
                        } else {// 运行时间最长的连接并未超时
                            try {
                                if (!countedwait) {
                                    state.hadtowaitcount++;
                                    countedwait = true;
                                }
                                long wt = system.currenttimemillis();
                                // 当前线程进入等待状态
                                state.wait(pooltimetowait);
                                state.accumulatedwaittime += system.currenttimemillis() - wt;
                            } catch (interruptedexception e) {
                                break;
                            }
                        }
                    }
                }
                if (conn != null) {
                    if (conn.isvalid()) {
                        if (!conn.getrealconnection().getautocommit()) {
                            // 进行回滚操作
                            conn.getrealconnection().rollback();
                        }
                        conn.setconnectiontypecode(assembleconnectiontypecode(datasource.geturl(), username, password));
                        // 设置统计字段
                        conn.setcheckouttimestamp(system.currenttimemillis());
                        conn.setlastusedtimestamp(system.currenttimemillis());
                        state.activeconnections.add(conn);
                        state.requestcount++;
                        state.accumulatedrequesttime += system.currenttimemillis() - t;
                    } else {
                        // 连接无效,此时累加无效连接相关的统计字段
                        state.badconnectioncount++;
                        localbadconnectioncount++;
                        conn = null;
                        if (localbadconnectioncount > (poolmaximumidleconnections
                            + poolmaximumlocalbadconnectiontolerance)) {
                            throw new sqlexception(...);
                        }
                    }
                }
            }

        }
        if (conn == null) {
            throw new sqlexception(...);
        }

        return conn;
    }
}

从连接池中获取连接首先会遇到两种情况:

  1. 连接池中有空闲连接
  2. 连接池中无空闲连接

对于第一种情况,把连接取出返回即可。对于第二种情况,则要进行细分,会有如下的情况。

  1. 活跃连接数没有超出最大活跃连接数
  2. 活跃连接数超出最大活跃连接数

对于上面两种情况,第一种情况比较好处理,直接创建新的连接即可。至于第二种情况,需要再次进行细分。

  1. 活跃连接的运行时间超出限制,即超时了
  2. 活跃连接未超时

对于第一种情况,我们直接将超时连接强行中断,并进行回滚,然后复用部分字段重新创建 pooledconnection 即可。对于第二种情况,目前没有更好的处理方式了,只能等待了。

回收连接

相比于获取连接,回收连接的逻辑要简单的多。回收连接成功与否只取决于空闲连接集合的状态,所需处理情况很少,因此比较简单。

我们还是来看看

public connection getconnection() throws sqlexception {
    return this.popconnection(this.datasource.getusername(), this.datasource.getpassword()).getproxyconnection();
}

返回的是pooledconnection的一个代理类,为什么不直接使用pooledconnection的realconnection呢?我们可以看下pooledconnection这个类

class pooledconnection implements invocationhandler {

 很熟悉是吧,标准的代理类用法,看下其invoke方法

pooledconnection

@override
public object invoke(object proxy, method method, object[] args) throws throwable {
    string methodname = method.getname();
    // 重点在这里,如果调用了其close方法,则实际执行的是将连接放回连接池的操作
    if (close.hashcode() == methodname.hashcode() && close.equals(methodname)) {
        datasource.pushconnection(this);
        return null;
    } else {
        try {
            if (!object.class.equals(method.getdeclaringclass())) {
                // issue #579 tostring() should never fail
                // throw an sqlexception instead of a runtime
                checkconnection();
            }
            // 其他的操作都交给realconnection执行
            return method.invoke(realconnection, args);
        } catch (throwable t) {
            throw exceptionutil.unwrapthrowable(t);
        }
    }
}

那我们来看看pushconnection做了什么

protected void pushconnection(pooledconnection conn) throws sqlexception {
    synchronized (state) {
        // 从活跃连接池中移除连接
        state.activeconnections.remove(conn);
        if (conn.isvalid()) {
            // 空闲连接集合未满
            if (state.idleconnections.size() < poolmaximumidleconnections
                && conn.getconnectiontypecode() == expectedconnectiontypecode) {
                state.accumulatedcheckouttime += conn.getcheckouttime();

                // 回滚未提交的事务
                if (!conn.getrealconnection().getautocommit()) {
                    conn.getrealconnection().rollback();
                }

                // 创建新的 pooledconnection
                pooledconnection newconn = new pooledconnection(conn.getrealconnection(), this);
                state.idleconnections.add(newconn);
                // 复用时间信息
                newconn.setcreatedtimestamp(conn.getcreatedtimestamp());
                newconn.setlastusedtimestamp(conn.getlastusedtimestamp());

                // 将原连接置为无效状态
                conn.invalidate();

                // 通知等待的线程
                state.notifyall();
                
            } else {// 空闲连接集合已满
                state.accumulatedcheckouttime += conn.getcheckouttime();
                // 回滚未提交的事务
                if (!conn.getrealconnection().getautocommit()) {
                    conn.getrealconnection().rollback();
                }

                // 关闭数据库连接
                conn.getrealconnection().close();
                conn.invalidate();
            }
        } else {
            state.badconnectioncount++;
        }
    }
}

先将连接从活跃连接集合中移除,如果空闲集合未满,此时复用原连接的字段信息创建新的连接,并将其放入空闲集合中即可;若空闲集合已满,此时无需回收连接,直接关闭即可。

连接池总觉得很神秘,但仔细分析完其代码之后,也就没那么神秘了,就是将连接使用完之后放到一个集合中,下面再获取连接的时候首先从这个集合中获取。  还有pooledconnection的代理模式的使用,值得我们学习

好了,我们已经获取到了数据库连接,接下来要创建preparestatement了,我们上面jdbc的例子是怎么获取的? psmt = conn.preparestatement(sql);,直接通过connection来获取,并且把sql传进去了,我们看看mybaits中是怎么创建preparestatement的

创建preparedstatement 

preparedstatementhandler

stmt = handler.prepare(connection, transaction.gettimeout());

public statement prepare(connection connection, integer transactiontimeout) throws sqlexception {
    statement statement = null;
    try {
        // 创建 statement
        statement = instantiatestatement(connection);
        // 设置超时和 fetchsize
        setstatementtimeout(statement, transactiontimeout);
        setfetchsize(statement);
        return statement;
    } catch (sqlexception e) {
        closestatement(statement);
        throw e;
    } catch (exception e) {
        closestatement(statement);
        throw new executorexception("error preparing statement.  cause: " + e, e);
    }
}

protected statement instantiatestatement(connection connection) throws sqlexception {
    //获取sql字符串,比如"select * from user where id= ?"
    string sql = boundsql.getsql();
    // 根据条件调用不同的 preparestatement 方法创建 preparedstatement
    if (mappedstatement.getkeygenerator() instanceof jdbc3keygenerator) {
        string[] keycolumnnames = mappedstatement.getkeycolumns();
        if (keycolumnnames == null) {
            //通过connection获取statement,将sql语句传进去
            return connection.preparestatement(sql, preparedstatement.return_generated_keys);
        } else {
            return connection.preparestatement(sql, keycolumnnames);
        }
    } else if (mappedstatement.getresultsettype() != null) {
        return connection.preparestatement(sql, mappedstatement.getresultsettype().getvalue(), resultset.concur_read_only);
    } else {
        return connection.preparestatement(sql);
    }
}

看到没和jdbc的形式一模一样,我们具体来看看connection.preparestatement做了什么

 1 public preparedstatement preparestatement(string sql, int resultsettype, int resultsetconcurrency) throws sqlexception {
 2        
 3     boolean canserverprepare = true;
 4 
 5     string nativesql = getprocessescapecodesforprepstmts() ? nativesql(sql) : sql;
 6 
 7     if (this.useserverpreparedstmts && getemulateunsupportedpstmts()) {
 8         canserverprepare = canhandleasserverpreparedstatement(nativesql);
 9     }
10 
11     if (this.useserverpreparedstmts && getemulateunsupportedpstmts()) {
12         canserverprepare = canhandleasserverpreparedstatement(nativesql);
13     }
14 
15     if (this.useserverpreparedstmts && canserverprepare) {
16         if (this.getcachepreparedstatements()) {
17             ......
18         } else {
19             try {
20                 //这里使用的是serverpreparedstatement创建preparedstatement
21                 pstmt = serverpreparedstatement.getinstance(getmultihostsafeproxy(), nativesql, this.database, resultsettype, resultsetconcurrency);
22 
23                 pstmt.setresultsettype(resultsettype);
24                 pstmt.setresultsetconcurrency(resultsetconcurrency);
25             } catch (sqlexception sqlex) {
26                 // punt, if necessary
27                 if (getemulateunsupportedpstmts()) {
28                     pstmt = (preparedstatement) clientpreparestatement(nativesql, resultsettype, resultsetconcurrency, false);
29                 } else {
30                     throw sqlex;
31                 }
32             }
33         }
34     } else {
35         pstmt = (preparedstatement) clientpreparestatement(nativesql, resultsettype, resultsetconcurrency, false);
36     }
37 }

我们只用看最关键的第21行代码,使用serverpreparedstatement的getinstance返回一个preparedstatement,其实本质上serverpreparedstatement继承了preparedstatement对象,我们看看其构造方法

protected serverpreparedstatement(connectionimpl conn, string sql, string catalog, int resultsettype, int resultsetconcurrency) throws sqlexception {
    //略...

    try {
        this.serverprepare(sql);
    } catch (sqlexception var10) {
        this.realclose(false, true);
        throw var10;
    } catch (exception var11) {
        this.realclose(false, true);
        sqlexception sqlex = sqlerror.createsqlexception(var11.tostring(), "s1000", this.getexceptioninterceptor());
        sqlex.initcause(var11);
        throw sqlex;
    }
    //略...

}

继续调用this.serverprepare(sql);

public class serverpreparedstatement extends preparedstatement {
    //存放运行时参数的数组
    private serverpreparedstatement.bindvalue[] parameterbindings;
    //服务器预编译好的sql语句返回的serverstatementid
    private long serverstatementid;
    private void serverprepare(string sql) throws sqlexception {
        synchronized(this.connection.getmutex()) {
            mysqlio mysql = this.connection.getio();
            try {
                //向sql服务器发送了一条prepare指令
                buffer prepareresultpacket = mysql.sendcommand(mysqldefs.com_prepare, sql, (buffer)null, false, characterencoding, 0);
                //记录下了预编译好的sql语句所对应的serverstatementid
                this.serverstatementid = prepareresultpacket.readlong();
                this.fieldcount = prepareresultpacket.readint();
                //获取参数个数,比喻 select * from user where id= ?and name = ?,其中有两个?,则这里返回的参数个数应该为2
                this.parametercount = prepareresultpacket.readint();
                this.parameterbindings = new serverpreparedstatement.bindvalue[this.parametercount];

                for(int i = 0; i < this.parametercount; ++i) {
                    //根据参数个数,初始化数组
                    this.parameterbindings[i] = new serverpreparedstatement.bindvalue();
                }

            } catch (sqlexception var16) {
                throw sqlex;
            } finally {
                this.connection.getio().clearinputstream();
            }

        }
    }
}
serverpreparedstatement继承preparedstatement,serverpreparedstatement初始化的时候就向sql服务器发送了一条prepare指令,把sql语句传到mysql服务器,如select * from user where id= ?and name = ?,mysql服务器会对sql进行编译,并保存在服务器,返回预编译语句对应的id,并保存在
serverpreparedstatement中,同时创建bindvalue[] parameterbindings数组,后面设置参数就直接添加到此数组中。好了,此时我们创建了一个serverpreparedstatement并返回,下面就是设置运行时参数了

设置运行时参数到 sql 中

我们已经获取到了preparedstatement,接下来就是将运行时参数设置到preparedstatement中,如下代码

handler.parameterize(stmt);

jdbc是怎么设置的呢?我们看看上面的例子,很简单吧

psmt = conn.preparestatement(sql);
//设置参数
psmt.setstring(1, username);
psmt.setstring(2, password);

我们来看看parameterize方法

public void parameterize(statement statement) throws sqlexception {
    // 通过参数处理器 parameterhandler 设置运行时参数到 preparedstatement 中
    parameterhandler.setparameters((preparedstatement) statement);
}

public class defaultparameterhandler implements parameterhandler {
    private final typehandlerregistry typehandlerregistry;
    private final mappedstatement mappedstatement;
    private final object parameterobject;
    private final boundsql boundsql;
    private final configuration configuration;

    public void setparameters(preparedstatement ps) {
        /*
         * 从 boundsql 中获取 parametermapping 列表,每个 parametermapping 与原始 sql 中的 #{xxx} 占位符一一对应
         */
        list<parametermapping> parametermappings = boundsql.getparametermappings();
        if (parametermappings != null) {
            for (int i = 0; i < parametermappings.size(); i++) {
                parametermapping parametermapping = parametermappings.get(i);
                if (parametermapping.getmode() != parametermode.out) {
                    object value;
                    // 获取属性名
                    string propertyname = parametermapping.getproperty();
                    if (boundsql.hasadditionalparameter(propertyname)) {
                        value = boundsql.getadditionalparameter(propertyname);
                    } else if (parameterobject == null) {
                        value = null;
                    } else if (typehandlerregistry.hastypehandler(parameterobject.getclass())) {
                        value = parameterobject;
                    } else {
                        // 为用户传入的参数 parameterobject 创建元信息对象
                        metaobject metaobject = configuration.newmetaobject(parameterobject);
                        // 从用户传入的参数中获取 propertyname 对应的值
                        value = metaobject.getvalue(propertyname);
                    }

                    typehandler typehandler = parametermapping.gettypehandler();
                    jdbctype jdbctype = parametermapping.getjdbctype();
                    if (value == null && jdbctype == null) {
                        jdbctype = configuration.getjdbctypefornull();
                    }
                    try {
                        // 由类型处理器 typehandler 向 parameterhandler 设置参数
                        typehandler.setparameter(ps, i + 1, value, jdbctype);
                    } catch (typeexception e) {
                        throw new typeexception(...);
                    } catch (sqlexception e) {
                        throw new typeexception(...);
                    }
                }
            }
        }
    }
}

首先从boundsql中获取parametermappings 集合,这块大家可以看看我前面的文章,然后遍历获取 parametermapping中的propertyname ,如#{name} 中的name,然后从运行时参数parameterobject中获取name对应的参数值,最后设置到preparedstatement 中,我们主要来看是如何设置参数的。也就是

typehandler.setparameter(ps, i + 1, value, jdbctype);,这句代码最终会向我们例子中一样执行,如下

public void setnonnullparameter(preparedstatement ps, int i, string parameter, jdbctype jdbctype) throws sqlexception {
    ps.setstring(i, parameter);
}

还记得我们的preparedstatement是什么吗?是serverpreparedstatement,那我们就来看看serverpreparedstatement的setstring方法

public void setstring(int parameterindex, string x) throws sqlexception {
    this.checkclosed();
    if (x == null) {
        this.setnull(parameterindex, 1);
    } else {
        //根据参数下标从parameterbindings数组总获取bindvalue
        serverpreparedstatement.bindvalue binding = this.getbinding(parameterindex, false);
        this.settype(binding, this.stringtypecode);
        //设置参数值
        binding.value = x;
        binding.isnull = false;
        binding.islongdata = false;
    }

}

protected serverpreparedstatement.bindvalue getbinding(int parameterindex, boolean forlongdata) throws sqlexception {
    this.checkclosed();
    if (this.parameterbindings.length == 0) {
        throw sqlerror.createsqlexception(messages.getstring("serverpreparedstatement.8"), "s1009", this.getexceptioninterceptor());
    } else {
        --parameterindex;
        if (parameterindex >= 0 && parameterindex < this.parameterbindings.length) {
            if (this.parameterbindings[parameterindex] == null) {
                this.parameterbindings[parameterindex] = new serverpreparedstatement.bindvalue();
            } else if (this.parameterbindings[parameterindex].islongdata && !forlongdata) {
                this.detectedlongparameterswitch = true;
            }

            this.parameterbindings[parameterindex].isset = true;
            this.parameterbindings[parameterindex].boundbeforeexecutionnum = (long)this.numberofexecutions;
            //根据参数下标从parameterbindings数组总获取bindvalue
            return this.parameterbindings[parameterindex];
        } else {
            throw sqlerror.createsqlexception(messages.getstring("serverpreparedstatement.9") + (parameterindex + 1) + messages.getstring("serverpreparedstatement.10") + this.parameterbindings.length, "s1009", this.getexceptioninterceptor());
        }
    }
}
就是根据参数下标从serverpreparedstatement的参数数组parameterbindings中获取bindvalue对象,然后设置值,好了现在serverpreparedstatement包含了预编译sql语句的id和参数数组,最后一步便是执行sql了。

执行查询

执行查询操作就是我们文章开头的最后一行代码,如下

return handler.<e>query(stmt, resulthandler);

我们来看看query是怎么做的

public <e> list<e> query(statement statement, resulthandler resulthandler) throws sqlexception {
    preparedstatement ps = (preparedstatement)statement;
    //直接执行serverpreparedstatement的execute方法
    ps.execute();
    return this.resultsethandler.handleresultsets(ps);
}

public boolean execute() throws sqlexception {
    this.checkclosed();
    connectionimpl locallyscopedconn = this.connection;
    if (!this.checkreadonlysafestatement()) {
        throw sqlerror.createsqlexception(messages.getstring("preparedstatement.20") + messages.getstring("preparedstatement.21"), "s1009", this.getexceptioninterceptor());
    } else {
        resultsetinternalmethods rs = null;
        cachedresultsetmetadata cachedmetadata = null;
        synchronized(locallyscopedconn.getmutex()) {
            //略....
            rs = this.executeinternal(rowlimit, sendpacket, dostreaming, this.firstcharofstmt == 's', metadatafromcache, false);
            //略....
        }

        return rs != null && rs.reallyresult();
    }
}

省略了很多代码,只看最关键的executeinternal

serverpreparedstatement

protected resultsetinternalmethods executeinternal(int maxrowstoretrieve, buffer sendpacket, boolean createstreamingresultset, boolean queryisselectonly, field[] metadatafromcache, boolean isbatch) throws sqlexception {
    try {
        return this.serverexecute(maxrowstoretrieve, createstreamingresultset, metadatafromcache);
    } catch (sqlexception var11) {
        throw sqlex;
    } 
}

private resultsetinternalmethods serverexecute(int maxrowstoretrieve, boolean createstreamingresultset, field[] metadatafromcache) throws sqlexception {
    synchronized(this.connection.getmutex()) {
        //略....
        mysqlio mysql = this.connection.getio();
        buffer packet = mysql.getsharedsendpacket();
        packet.clear();
        packet.writebyte((byte)mysqldefs.com_execute);
        //将该语句对应的id写入数据包
        packet.writelong(this.serverstatementid);

        int i;
        //将对应的参数写入数据包
        for(i = 0; i < this.parametercount; ++i) {
            if (!this.parameterbindings[i].islongdata) {
                if (!this.parameterbindings[i].isnull) {
                    this.storebinding(packet, this.parameterbindings[i], mysql);
                } else {
                    nullbitsbuffer[i / 8] = (byte)(nullbitsbuffer[i / 8] | 1 << (i & 7));
                }
            }
        }
        //发送数据包,表示执行id对应的预编译sql
        buffer resultpacket = mysql.sendcommand(mysqldefs.com_execute, (string)null, packet, false, (string)null, 0);
        //略....
        resultsetimpl rs = mysql.readallresults(this,  this.resultsettype,  resultpacket, true, (long)this.fieldcount, metadatafromcache);
        //返回结果
        return rs;
    }
}

serverpreparedstatement在记录下serverstatementid后,对于相同sql模板的操作,每次只是发送serverstatementid和对应的参数,省去了编译sql的过程。 至此我们的已经从数据库拿到了查询结果,但是结果是resultsetimpl类型,我们还需要将返回结果转化成我们的java对象呢,留在下一篇来讲吧