原创

Mybatis的Executor介绍(一)

5       MybatisExecutor介绍(一)

 

目录

 

5       MybatisExecutor介绍(一)

5.1        SimpleExecutor

5.2        ReuseExecutor

5.3        BatchExecutor

5.4        Executor的选择

5.4.1         默认的Executor

 

       Mybatis中所有的Mapper语句的执行都是通过Executor进行的,ExecutorMybatis的一个核心接口,其定义如下。从其定义的接口方法我们可以看出,对应的增删改语句是通过Executor接口的update方法进行的,查询是通过query方法进行的。虽然Executor接口的实现类有BaseExecutorCachingExecutor,而BaseExecutor的子类又有SimpleExecutorReuseExecutorBatchExecutor,但BaseExecutor是一个抽象类,其只实现了一些公共的封装,而把真正的核心实现都通过方法抽象出来给子类实现,如doUpdate()doQuery()CachingExecutor只是在Executor的基础上加入了缓存的功能,底层还是通过Executor调用的,所以真正有作用的Executor只有SimpleExecutorReuseExecutorBatchExecutor。它们都是自己实现的Executor核心功能,没有借助任何其它的Executor实现,它们是实现不同也就注定了它们的功能也是不一样的。Executor是跟SqlSession绑定在一起的,每一个SqlSession都拥有一个新的Executor对象,由Configuration创建。

public interface Executor {

 

  ResultHandler NO_RESULT_HANDLER = null;

 

  int update(MappedStatement ms, Object parameter) throws SQLException;

 

  <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;

 

  <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;

 

  List<BatchResult> flushStatements() throws SQLException;

 

  void commit(booleanrequired) throws SQLException;

 

  void rollback(booleanrequired) throws SQLException;

 

  CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);

 

  boolean isCached(MappedStatement ms, CacheKey key);

 

  void clearLocalCache();

 

  void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);

 

  Transaction getTransaction();

 

  void close(booleanforceRollback);

 

  boolean isClosed();

 

  void setExecutorWrapper(Executor executor);

 

}

 

       下面是BaseExecutor的源码,我们可以看看它是怎么实现Executor接口的,是怎么开放接口给子类实现的。

public abstract class BaseExecutor implements Executor {

 

  private static final Log log = LogFactory.getLog(BaseExecutor.class);

 

  protected Transaction transaction;

  protected Executor wrapper;

 

  protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;

  protected PerpetualCache localCache;

  protected PerpetualCache localOutputParameterCache;

  protected Configuration configuration;

 

  protected int queryStack = 0;

  private boolean closed;

 

  protected BaseExecutor(Configuration configuration, Transaction transaction) {

    this.transaction = transaction;

    this.deferredLoads = new ConcurrentLinkedQueue<DeferredLoad>();

    this.localCache = new PerpetualCache("LocalCache");

    this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");

    this.closed = false;

    this.configuration = configuration;

    this.wrapper = this;

  }

 

  @Override

  public Transaction getTransaction() {

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    returntransaction;

  }

 

  @Override

  public void close(boolean forceRollback) {

    try {

      try {

        rollback(forceRollback);

      } finally {

        if (transaction != null) {

          transaction.close();

        }

      }

    } catch (SQLException e) {

      // Ignore.  There's nothing that can be done at this point.

      log.warn("Unexpected exception on closing transaction.  Cause: " + e);

    } finally {

      transaction = null;

      deferredLoads = null;

      localCache = null;

      localOutputParameterCache = null;

      closed = true;

    }

  }

 

  @Override

  public boolean isClosed() {

    return closed;

  }

 

  @Override

  public int update(MappedStatement ms, Object parameter) throws SQLException {

    ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    clearLocalCache();

    return doUpdate(ms, parameter);

  }

 

  @Override

  public List<BatchResult> flushStatements() throws SQLException {

    return flushStatements(false);

  }

 

  public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    return doFlushStatements(isRollBack);

  }

 

  @Override

  public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {

    BoundSql boundSql = ms.getBoundSql(parameter);

    CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);

    return query(ms, parameter, rowBounds, resultHandler, key, boundSql);

 }

 

  @SuppressWarnings("unchecked")

  @Override

  public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {

    ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    if (queryStack == 0 && ms.isFlushCacheRequired()) {

      clearLocalCache();

    }

    List<E> list;

    try {

      queryStack++;

      list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;

      if (list != null) {

        handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);

      } else {

        list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);

      }

    } finally {

      queryStack--;

    }

    if (queryStack == 0) {

      for (DeferredLoad deferredLoad : deferredLoads) {

        deferredLoad.load();

      }

      // issue #601

      deferredLoads.clear();

      if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {

        // issue #482

        clearLocalCache();

      }

    }

    return list;

  }

 

  @Override

  public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    DeferredLoad deferredLoad = new DeferredLoad(resultObject, property, key, localCache, configuration, targetType);

    if (deferredLoad.canLoad()) {

      deferredLoad.load();

    } else {

      deferredLoads.add(new DeferredLoad(resultObject, property, key, localCache, configuration, targetType));

    }

  }

 

  @Override

  public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {

    if (closed) {

      throw new ExecutorException("Executor was closed.");

    }

    CacheKey cacheKey = new CacheKey();

    cacheKey.update(ms.getId());

    cacheKey.update(Integer.valueOf(rowBounds.getOffset()));

    cacheKey.update(Integer.valueOf(rowBounds.getLimit()));

    cacheKey.update(boundSql.getSql());

    List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();

    TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();

    // mimic DefaultParameterHandler logic

    for (int i = 0; i < parameterMappings.size(); i++) {

      ParameterMapping parameterMapping = parameterMappings.get(i);

      if (parameterMapping.getMode() != ParameterMode.OUT) {

        Object value;

        String propertyName = parameterMapping.getProperty();

        if (boundSql.hasAdditionalParameter(propertyName)) {

          value = boundSql.getAdditionalParameter(propertyName);

        } else if (parameterObject == null) {

          value = null;

        } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {

          value = parameterObject;

        } else {

          MetaObject metaObject = configuration.newMetaObject(parameterObject);

          value = metaObject.getValue(propertyName);

        }

        cacheKey.update(value);

      }

    }

    if (configuration.getEnvironment() != null) {

      // issue #176

      cacheKey.update(configuration.getEnvironment().getId());

    }

    return cacheKey;

  }   

 

  @Override

  public boolean isCached(MappedStatement ms, CacheKey key) {

    return localCache.getObject(key) != null;

  }

 

  @Override

  public void commit(boolean required) throws SQLException {

    if (closed) {

      throw new ExecutorException("Cannot commit, transaction is already closed");

    }

    clearLocalCache();

    flushStatements();

    if (required) {

      transaction.commit();

    }

  }

 

  @Override

  public void rollback(boolean required) throws SQLException {

    if (!closed) {

      try {

        clearLocalCache();

        flushStatements(true);

      } finally {

        if (required) {

          transaction.rollback();

        }

      }

    }

  }

 

  @Override

  public void clearLocalCache() {

    if (!closed) {

      localCache.clear();

      localOutputParameterCache.clear();

    }

  }

 

  protected abstract int doUpdate(MappedStatement ms, Object parameter)

      throws SQLException;

 

  protected abstract List<BatchResult> doFlushStatements(boolean isRollback)

      throws SQLException;

 

  protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)

      throws SQLException;

 

  protected void closeStatement(Statement statement) {

    if (statement != null) {

      try {

        statement.close();

      } catch (SQLException e) {

        // ignore

      }

    }

  }

 

  private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) {

    if (ms.getStatementType() == StatementType.CALLABLE) {

      final Object cachedParameter = localOutputParameterCache.getObject(key);

      if (cachedParameter != null && parameter != null) {

        final MetaObject metaCachedParameter = configuration.newMetaObject(cachedParameter);

        final MetaObject metaParameter = configuration.newMetaObject(parameter);

        for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {

          if (parameterMapping.getMode() != ParameterMode.IN) {

            final String parameterName = parameterMapping.getProperty();

            final Object cachedValue = metaCachedParameter.getValue(parameterName);

            metaParameter.setValue(parameterName, cachedValue);

          }

        }

      }

    }

  }

 

  private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {

    List<E> list;

    localCache.putObject(key, EXECUTION_PLACEHOLDER);

    try {

      list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);

    } finally {

      localCache.removeObject(key);

    }

    localCache.putObject(key, list);

    if (ms.getStatementType() == StatementType.CALLABLE) {

      localOutputParameterCache.putObject(key, parameter);

    }

    returnlist;

  }

 

  protected Connection getConnection(Log statementLog) throws SQLException {

    Connection connection = transaction.getConnection();

    if (statementLog.isDebugEnabled()) {

      return ConnectionLogger.newInstance(connection, statementLog, queryStack);

    } else {

      return connection;

    }

  }

 

  @Override

  public void setExecutorWrapper(Executor wrapper) {

    this.wrapper = wrapper;

  }

 

  private static class DeferredLoad {

 

    private final MetaObject resultObject;

    private final String property;

    private final Class<?> targetType;

    private final CacheKey key;

    private final PerpetualCache localCache;

    private final ObjectFactory objectFactory;

    private final ResultExtractor resultExtractor;

 

    // issue #781

    public DeferredLoad(MetaObject resultObject,

                        String property,

                        CacheKey key,

                        PerpetualCache localCache,

                        Configuration configuration,

                        Class<?> targetType) {

      this.resultObject = resultObject;

      this.property = property;

      this.key = key;

      this.localCache = localCache;

      this.objectFactory = configuration.getObjectFactory();

      this.resultExtractor = new ResultExtractor(configuration, objectFactory);

      this.targetType = targetType;

    }

 

    public boolean canLoad() {

      return localCache.getObject(key) != null && localCache.getObject(key) != EXECUTION_PLACEHOLDER;

    }

 

    public void load() {

      @SuppressWarnings( "unchecked" )

      // we suppose we get back a List

      List<Object> list = (List<Object>) localCache.getObject(key);

      Object value = resultExtractor.extractObjectFromList(list, targetType);

      resultObject.setValue(property, value);

    }

 

  }

 

}

 

5.1     SimpleExecutor

       SimpleExecutorMybatis执行Mapper语句时默认使用的Executor。它提供最基本的Mapper语句执行功能,没有过多的封装的。SimpleExecutor的源码如下。

public class SimpleExecutor extends BaseExecutor {

 

  public SimpleExecutor(Configuration configuration, Transaction transaction) {

    super(configuration, transaction);

  }

 

  @Override

  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {

    Statement stmt = null;

    try {

      Configuration configuration = ms.getConfiguration();

      StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);

      stmt = prepareStatement(handler, ms.getStatementLog());

      returnhandler.update(stmt);

    } finally {

      closeStatement(stmt);

    }

  }

 

  @Override

  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {

    Statement stmt = null;

    try {

      Configuration configuration = ms.getConfiguration();

      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);

      stmt = prepareStatement(handler, ms.getStatementLog());

      return handler.<E>query(stmt, resultHandler);

    } finally {

      closeStatement(stmt);

    }

  }

 

  @Override

  public List<BatchResult> doFlushStatements(booleanisRollback) throws SQLException {

    return Collections.emptyList();

  }

 

  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {

    Statement stmt;

    Connection connection = getConnection(statementLog);

    stmt = handler.prepare(connection);

    handler.parameterize(stmt);

    returnstmt;

  }

 

}

 

5.2     ReuseExecutor

       ReuseExecutor,顾名思义,是可以重用的Executor。它重用的是Statement对象,它会在内部利用一个Map把创建的Statement都缓存起来,每次在执行一条SQL语句时,它都会去判断之前是否存在基于该SQL缓存的Statement对象,存在而且之前缓存的Statement对象对应的Connection还没有关闭的时候就继续用之前的Statement对象,否则将创建一个新的Statement对象,并将其缓存起来。因为每一个新的SqlSession都有一个新的Executor对象,所以我们缓存在ReuseExecutor上的Statement的作用域是同一个SqlSession。以下是ReuseExecutor的源码。

public class ReuseExecutor extends BaseExecutor {

 

  private final Map<String, Statement> statementMap = new HashMap<String, Statement>();

 

  public ReuseExecutor(Configuration configuration, Transaction transaction) {

    super(configuration, transaction);

  }

 

  @Override

  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {

    Configuration configuration = ms.getConfiguration();

    StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);

    Statement stmt = prepareStatement(handler, ms.getStatementLog());

    return handler.update(stmt);

  }

 

  @Override

  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {

    Configuration configuration = ms.getConfiguration();

    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);

    Statement stmt = prepareStatement(handler, ms.getStatementLog());

    return handler.<E>query(stmt, resultHandler);

  }

 

  @Override

  public List<BatchResult> doFlushStatements(booleanisRollback) throws SQLException {

    for (Statement stmt : statementMap.values()) {

      closeStatement(stmt);

    }

    statementMap.clear();

    return Collections.emptyList();

  }

 

  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {

    Statement stmt;

    BoundSql boundSql = handler.getBoundSql();

    String sql = boundSql.getSql();

    if (hasStatementFor(sql)) {

      stmt = getStatement(sql);

    } else {

      Connection connection = getConnection(statementLog);

      stmt = handler.prepare(connection);

      putStatement(sql, stmt);

    }

    handler.parameterize(stmt);

    return stmt;

  }

 

  private boolean hasStatementFor(String sql) {

    try {

      return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();

    } catch (SQLException e) {

      return false;

    }

  }

 

  private Statement getStatement(String s) {

    return statementMap.get(s);

  }

 

  private void putStatement(String sql, Statement stmt) {

    statementMap.put(sql, stmt);

  }

 

}

 

5.3     BatchExecutor

       BatchExecutor的设计主要是用于做批量更新操作的。其底层会调用StatementexecuteBatch()方法实现批量操作。以下是BatchExecutor的源码。

public class BatchExecutor extends BaseExecutor {

  public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE + 1002;

  private final List<Statement> statementList = new ArrayList<Statement>();
  private final List<BatchResult> batchResultList = new ArrayList<BatchResult>();
  private String currentSql;
  private MappedStatement currentStatement;

  public BatchExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
  }

  @Override
  public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    final Configuration configuration = ms.getConfiguration();
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    final BoundSql boundSql = handler.getBoundSql();
    final String sql = boundSql.getSql();
    final Statement stmt;
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
      int last = statementList.size() - 1;
      stmt = statementList.get(last);
     handler.parameterize(stmt);//fix Issues 322
      BatchResult batchResult = batchResultList.get(last);
      batchResult.addParameterObject(parameterObject);
    } else {
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection);
      handler.parameterize(stmt);    //fix Issues 322
      currentSql = sql;
      currentStatement = ms;
      statementList.add(stmt);
      batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
  // handler.parameterize(stmt);
    handler.batch(stmt);
    return BATCH_UPDATE_RETURN_VALUE;
  }

  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
      throws SQLException {
    Statement stmt = null;
    try {
      flushStatements();
      Configuration configuration = ms.getConfiguration();
      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql);
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection);
      handler.parameterize(stmt);
      return handler.<E>query(stmt, resultHandler);
    } finally {
      closeStatement(stmt);
    }
  }

  @Override
  public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    try {
      List<BatchResult> results = new ArrayList<BatchResult>();
      if (isRollback) {
        return Collections.emptyList();
      }
      for (int i = 0, n = statementList.size(); i < n; i++) {
        Statement stmt = statementList.get(i);
        BatchResult batchResult = batchResultList.get(i);
        try {
          batchResult.setUpdateCounts(stmt.executeBatch());
          MappedStatement ms = batchResult.getMappedStatement();
          List<Object> parameterObjects = batchResult.getParameterObjects();
          KeyGenerator keyGenerator = ms.getKeyGenerator();
          if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
            Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
            jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
          } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
            for (Object parameter : parameterObjects) {
              keyGenerator.processAfter(this, ms, stmt, parameter);
            }
          }
        } catch (BatchUpdateException e) {
          StringBuilder message = new StringBuilder();
          message.append(batchResult.getMappedStatement().getId())
              .append(" (batch index #")
              .append(i + 1)
              .append(")")
              .append(" failed.");
          if (i > 0) {
            message.append(" ")
                .append(i)
                .append(" prior sub executor(s) completed successfully, but will be rolled back.");
          }
          throw new BatchExecutorException(message.toString(), e, results, batchResult);
        }
        results.add(batchResult);
      }
      return results;
    } finally {
      for (Statement stmt : statementList) {
        closeStatement(stmt);
      }
      currentSql = null;
      statementList.clear();
      batchResultList.clear();
    }
  }

}
 

 

5.4     Executor的选择

       既然BaseExecutor下面有SimpleExecutorReuseExecutorBatchExecutorExecutor还有一个CachingExecutor的实现,那我们怎么选择使用哪个Executor呢?默认情况下Mybatis的全局配置cachingEnabled=”true”,这就意味着默认情况下我们就会使用一个CachingExecutor来包装我们真正使用的Executor,这个在后续介绍Mybatis的缓存的文章中会介绍。那我们真正使用的BaseExecutor是怎么确定的呢?是通过我们在创建SqlSession的时候确定的。SqlSession都是通过SqlSessionFactoryopenSession()创建的,SqlSessionFactory提供了一系列的openSession()方法。

public interface SqlSessionFactory {

 

  SqlSession openSession();

 

  SqlSession openSession(boolean autoCommit);

  SqlSession openSession(Connection connection);

  SqlSession openSession(TransactionIsolationLevel level);

 

  SqlSession openSession(ExecutorType execType);

  SqlSession openSession(ExecutorType execType, boolean autoCommit);

  SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level);

  SqlSession openSession(ExecutorType execType, Connection connection);

 

  Configuration getConfiguration();

 

}

 

       从上面SqlSessionFactory提供的方法来看,它一共提供了两类创建SqlSession的方法,一类是没有指定ExecutorType的,一类是指定了ExecutorType的。很显然,指定了ExecutorType时将使用ExecutorType对应类型的ExecutorExecutorType是一个枚举类型,有SIMPLEREUSEBATCH三个对象。

 

5.4.1  默认的Executor

       而没有指定ExecutorType时将使用默认的ExecutorMybatis默认的ExecutorSimpleExecutor,我们可以通过Mybatis的全局配置defaultExecutorType来进行配置,其可选值也是SIMPLEREUSEBATCH

      <setting name="defaultExecutorType" value="SIMPLE"/>

 

       注意,当Mybatis整合Spring后,Spring扫描后生成的Mapper对象,底层使用的SqlSession都是用的默认的Executor。如果我们需要在程序中使用非默认的Executor时,我们可以在Springbean容器中声明SqlSessionFactoryBean,然后在需要指定Executor的类中注入SqlSessionFactory,通过SqlSessionFactory来创建指定ExecutorTypeSqlSession

 

 

参考文档

       Mybatis源码

 

(注:本文是基于Mybatis3.3.1所写,写于20161224日星期六)

 

 

正文到此结束
本文目录