《ShardingSphere解读》17 执行引擎:分片环境下 SQL 执行的整体流程应该如何进行抽象?
从今天开始我们将开始一个全新的主题即 ShardingSphere 的执行引擎ExecuteEngine。一旦我们获取了从路由引擎和改写引擎中所生成的 SQL执行引擎就会完成这些SQL在具体数据库中的执行。执行引擎是 ShardingSphere 的核心模块接下来我们将来对其进行全面介绍。今天我们先讨论在分片环境下ShardingSphere 对 SQL 执行的整体流程的抽象过程后面会向你讲解“如何把握 ShardingSphere 中的 Executor 执行模型”。ShardingSphere 执行引擎总体结构在讲解具体的源代码之前我们从《路由引擎如何理解分片路由核心类 ShardingRouter 的运作机制》中的 PreparedQueryShardingEngine 和 SimpleQueryShardingEngine 这两个类出发看看在 ShardingSphere 中使用它们的入口。我们在ShardingStatement类中找到了如下所示的一个 shard 方法这里用到了 SimpleQueryShardingEngineprivate void shard(final String sql) { //从 Connection 中获取 ShardingRuntimeContext 上下文 ShardingRuntimeContext runtimeContext connection.getRuntimeContext(); //创建 SimpleQueryShardingEngine SimpleQueryShardingEngine shardingEngine new SimpleQueryShardingEngine(runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine()); //执行分片路由并获取路由结果 sqlRouteResult shardingEngine.shard(sql, Collections.emptyList()); }而在ShardingPreparedStatement中也存在一个类似的 shard 方法。从设计模式上讲ShardingStatement 和 ShardingPreparedStatement 实际上就是很典型的外观类它们把与 SQL 路由和执行的入口类都整合在一起。通过阅读源码我们不难发现在 ShardingStatement 中存在一个 StatementExecutor而在 ShardingPreparedStatement 中也存在 PreparedStatementExecutor 和 BatchPreparedStatementExecutor这些类都以 Executor 结尾显然这就是我们要找的 SQL 执行引擎的入口类。我们发现上述三个 Executor 都位于 sharding-jdbc-core 工程中。此外还有一个与 sharding-core-route 和 sharding-core-rewrite 并列的sharding-core-execute 工程从命名上看这个工程应该也与执行引擎相关。果然我们在这个工程中找到了ShardingExecuteEngine 类这是分片执行引擎的入口类。然后我们又分别找到了 SQLExecuteTemplate 和 SQLExecutePrepareTemplate 类这两个是典型的SQL 执行模板类。根据到目前为止对 ShardingSphere 组件设计和代码分层风格的了解可以想象在层次关系上ShardingExecuteEngine 是底层对象SQLExecuteTemplate 应该依赖于 ShardingExecuteEngine而 StatementExecutor、PreparedStatementExecutor 和 BatchPreparedStatementExecutor 属于上层对象应该依赖于 SQLExecuteTemplate。我们通过简单阅读这些核心类之前的引用关系印证了这种猜想。基于以上分析我们可以给出 SQL 执行引擎的整体结构图如下图其中横线以上部分位于 sharding-core-execute 工程属于底层组件而直线以下部分位于 sharding-jdbc-core 中属于上层组件。ShardingSphere 执行引擎核心类的分层结构图另一方面我们在上图中还看到 SQLExecuteCallback 和 SQLExecutePrepareCallback显然它们的作用是完成 SQL 执行过程中的回调处理这也是一种非常典型的扩展性处理方式。ShardingExecuteEngine按照惯例我们还是从位于底层的 ShardingExecuteEngine 开始切入。与路由和改写引擎不同ShardingExecuteEngine 是 ShardingSphere 中唯一的一个执行引擎所以直接设计为一个类而非接口这个类包含了如下的变量和构造函数private final ShardingExecutorService shardingExecutorService; private ListeningExecutorService executorService; public ShardingExecuteEngine(final int executorSize) { shardingExecutorService new ShardingExecutorService(executorSize); executorService shardingExecutorService.getExecutorService(); }1.ExecutorService如上所示我们可以看出这里有两个以 ExecutorService 结尾的变量显然从命名上不难看出它们都是执行器服务与 JDK 中的 java.util.concurrent.ExecutorService 类似。其中ListeningExecutorService来自 Google 的工具包 Guava而ShardingExecutorService是 ShardingSphere 中的自定义类包含了 ListeningExecutorService 的构建过程。接下来我们对两者分别展开讲述。ShardingExecutorService我们发现 ShardingExecutorService 包含了一个 JDK 的 ExecutorService它的创建过程如下这里用到的 newCachedThreadPool 和 newFixedThreadPool 都是 JDK 提供的常见方法private ExecutorService getExecutorService(final int executorSize, final String nameFormat) { ThreadFactory shardingThreadFactory ShardingThreadFactoryBuilder.build(nameFormat); return 0 executorSize ? Executors.newCachedThreadPool(shardingThreadFactory) : Executors.newFixedThreadPool(executorSize, shardingThreadFactory); }ListeningExecutorService由于 JDK 中普通线程池返回的 Future 功能比较单一所以 Guava 提供了 ListeningExecutorService 对其进行装饰。我们可以通过 ListeningExecutorService 对 ExecutorService 做一层包装返回一个 ListenableFuture 实例而 ListenableFuture 又是继承自 Future扩展了一个 addListener 监听方法这样当任务执行完成就会主动回调该方法。ListeningExecutorService 的构建过程如下所示executorService MoreExecutors.listeningDecorator(getExecutorService(executorSize, nameFormat)); oreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);明确了执行器 ExecutorService 之后我们回到 ShardingExecuteEngine 类该类以 groupExecute 方法为入口这个方法参数比较多也单独都列了一下/** * param inputGroups输入组 * param firstCallback第一次分片执行回调 * param callback分片执行回调 * param serial是否使用多线程进行执行 * param 输入值类型 * param 返回值类型 * return 执行结果 * throws SQLException抛出异常 */ public List groupExecute( final Collection inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback, final boolean serial) throws SQLException { if (inputGroups.isEmpty()) { return Collections.emptyList(); } return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback); }这里的分片执行组 ShardingExecuteGroup 对象实际上就是一个包含输入信息的列表而上述 groupExecute 方法的输入是一个 ShardingExecuteGroup 的集合。通过判断输入参数 serial 是否为 true上述代码流程分别转向了serialExecute 和 parallelExecute 这两个代码分支接下来我来分别讲解一下这两个代码分支。2.SerialExecute我们先来看 serialExecute 方法顾名思义该方法用于串行执行的场景private List serialExecute(final Collection inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback) throws SQLException { Iterator inputGroupsIterator inputGroups.iterator(); //获取第一个输入的ShardingExecuteGroup ShardingExecuteGroup firstInputs inputGroupsIterator.next(); //通过第一个回调 firstCallback 完成同步执行的 syncGroupExecute List result new LinkedList(syncGroupExecute(firstInputs, null firstCallback ? callback : firstCallback)); //对剩下的 ShardingExecuteGroup通过回调 callback 逐个同步执行 syncGroupExecute for (ShardingExecuteGroup each : Lists.newArrayList(inputGroupsIterator)) { result.addAll(syncGroupExecute(each, callback)); } return result; }上述代码的基本流程是获取第一个输入的 ShardingExecuteGroup通过第一个回调 firstCallback 完成同步执行的 syncGroupExecute 方法。然后对剩下的 ShardingExecuteGroup通过回调 callback 逐个执行 syncGroupExecute 方法。这里的 syncGroupExecute 方法如下所示private Collection syncGroupExecute(final ShardingExecuteGroup executeGroup, final ShardingGroupExecuteCallback callback) throws SQLException { return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap()); }我们看到同步执行的过程实际上是交给了 ShardingGroupExecuteCallback 回调接口public interface ShardingGroupExecuteCallback { Collection execute(Collection inputs, boolean isTrunkThread, Map shardingExecuteDataMap) throws SQLException; }这里的 ShardingExecuteDataMap 相当于一个用于 SQL 执行的数据字典这些数据字典保存在 ThreadLocal 中从而确保了线程安全。我们可以根据当前的执行线程获取对应的 DataMap 对象。3.ParallelExecute这样关于串行执行的流程就介绍完了接下来我们来看并行执行的 parallelExecute 方法private List parallelExecute(final Collection inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback) throws SQLException { Iterator inputGroupsIterator inputGroups.iterator(); //获取第一个输入的 ShardingExecuteGroup ShardingExecuteGroup firstInputs inputGroupsIterator.next(); //通过 asyncGroupExecute 执行异步回调 Collection restResultFutures asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback); //获取执行结果并组装返回 return getGroupResults(syncGroupExecute(firstInputs, null firstCallback ? callback : firstCallback), restResultFutures); }注意到这里有一个异步执行方法 asyncGroupExecute传入参数是一个 ShardingExecuteGroup 列表private Collection asyncGroupExecute(final List inputGroups, final ShardingGroupExecuteCallback callback) { Collection result new LinkedList(); for (ShardingExecuteGroup each : inputGroups) { result.add(asyncGroupExecute(each, callback)); } return result; }这个方法中针对每个传入的 ShardingExecuteGroup再次调用一个重载的异步 asyncGroupExecute 方法private ListenableFuture asyncGroupExecute(final ShardingExecuteGroup inputGroup, final ShardingGroupExecuteCallback callback) { final Map dataMap ShardingExecuteDataMap.getDataMap(); return executorService.submit(new Callable() { Override public Collection call() throws SQLException { return callback.execute(inputGroup.getInputs(), false, dataMap); } }); }显然作为异步执行方法这里就会使用 Guava 的 ListeningExecutorService 来提交一个异步执行的任务并返回一个 ListenableFuture而这个异步执行的任务就是具体的回调。最后我们来看 parallelExecute 方法的最后一句即调用 getGroupResults 方法获取执行结果private List getGroupResults(final Collection firstResults, final Collection restFutures) throws SQLException { List result new LinkedList(firstResults); for (ListenableFuture each : restFutures) { try { result.addAll(each.get()); } catch (final InterruptedException | ExecutionException ex) { return throwException(ex); } } return result; }熟悉 Future 用法的同学对上述代码应该不会陌生我们遍历 ListenableFuture然后调动它的 get 方法同步等待返回结果最后当所有的结果都获取到之后组装成一个结果列表并返回这种写法在使用 Future 时非常常见。我们回过头来看无论是 serialExecute 方法还是 parallelExecute 方法都会从 ShardingExecuteGroup 中获取第一个 firstInputs 元素并进行执行然后剩下的再进行同步或异步执行。ShardingSphere 这样使用线程的背后有其独特的设计思路。考虑到当前线程同样也是一种可用资源让第一个任务由当前线程进行执行就可以充分利用当前线程从而最大化线程的利用率。至此关于 ShardingExecuteEngine 类的介绍就告一段落。作为执行引擎ShardingExecuteEngine 所做的事情就是提供一个多线程的执行环境。在系统设计上这也是在日常开发过程中可以参考的一个技巧。我们可以设计并实现一个多线程执行环境这个环境不需要完成具体的业务操作而只需要负责执行传入的回调函数。ShardingSphere 中的ShardingExecuteEngine 就是提供了这样一种环境同样的实现方式在其他诸如 Spring 等开源框架中也都可以看到。接下来就让我们来看一下 ShardingSphere 如何通过回调完成 SQL 的真正执行。回调接口 ShardingGroupExecuteCallback回调接口 ShardingGroupExecuteCallback 的定义非常简单public interface ShardingGroupExecuteCallback { Collection execute(Collection inputs, boolean isTrunkThread, Map shardingExecuteDataMap) throws SQLException; }该接口根据传入的泛型 inputs 集合和 shardingExecuteDataMap 完成真正的 SQL 执行操作。在 ShardingSphere 中使用匿名方法实现 ShardingGroupExecuteCallback 接口的地方有很多但显式实现这一接口的只有一个类即 SQLExecuteCallback 类这是一个抽象类它的 execute 方法如下所示Override public final Collection execute(final Collection statementExecuteUnits, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException { Collection result new LinkedList(); for (StatementExecuteUnit each : statementExecuteUnits) { result.add(execute0(each, isTrunkThread, shardingExecuteDataMap)); } return result; }对于每个输入的 StatementExecuteUnit 数据结构上述 execute 方法会进一步执行一个 execute0 方法如下所示private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException { //设置 ExecutorExceptionHandler ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); //获取 DataSourceMetaData这里用到了缓存机制 DataSourceMetaData dataSourceMetaData getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData()); //初始化 SQLExecutionHook SQLExecutionHook sqlExecutionHook new SPISQLExecutionHook(); try { RouteUnit routeUnit statementExecuteUnit.getRouteUnit(); //启动执行钩子 sqlExecutionHook.start(routeUnit.getDataSourceName(), routeUnit.getSqlUnit().getSql(), routeUnit.getSqlUnit().getParameters(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap); //执行 SQL T result executeSQL(routeUnit.getSqlUnit().getSql(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode()); //成功钩子 sqlExecutionHook.finishSuccess(); return result; } catch (final SQLException ex) { //失败钩子 sqlExecutionHook.finishFailure(ex); //异常处理 ExecutorExceptionHandler.handleException(ex); return null; } }这段代码每一句的含义都比较明确这里引入了一个 ExecutorExceptionHandler 用于异常处理同时也引入了一个 SPISQLExecutionHook 对执行过程嵌入钩子。关于基于 SPI 机制的 Hook 实现机制我们在前面的 SQL 解析和路由引擎中已经看到过很多次这里不再赘述。我们看到真正执行 SQL 的过程是交给 executeSQL 模板方法进行完成需要 SQLExecuteCallback 的各个子类实现这一模板方法。在 ShardingSphere 中没有提供任何的 SQLExecuteCallback 实现类但大量采用匿名方法来完成 executeSQL 模板方法的实现。例如 StatementExecutor 类中executeQuery 方法就创建了一个 SQLExecuteCallback 匿名实现方法用来完成查询操作public List executeQuery() throws SQLException { final boolean isExceptionThrown ExecutorExceptionHandler.isExceptionThrown(); //创建 SQLExecuteCallback 并执行查询 SQLExecuteCallback executeCallback new SQLExecuteCallback(getDatabaseType(), isExceptionThrown) { Override protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException { return getQueryResult(sql, statement, connectionMode); } }; //执行 SQLExecuteCallback 并返回结果 return executeCallback(executeCallback); }模板类 SQLExecuteTemplate在 ShardingSphere 执行引擎的底层组件中还有一个类需要展开这就是模板类 SQLExecuteTemplate它是 ShardingExecuteEngine 的直接使用者。从命名上看这是一个典型的模板工具类定位上就像 Spring 中的 JdbcTemplate 一样。但凡这种模板工具类其实现一般都比较简单基本就是对底层对象的简单封装。SQLExecuteTemplate 也不例外它要做的就是对 ShardingExecuteEngine 中的入口方法进行封装和处理。ShardingExecuteEngine 的核心方法就只有一个即 executeGroup 方法public List executeGroup(final Collection sqlExecuteGroups, final SQLExecuteCallback firstCallback, final SQLExecuteCallback callback) throws SQLException { try { return executeEngine.groupExecute((Collection) sqlExecuteGroups, firstCallback, callback, serial); } catch (final SQLException ex) { ExecutorExceptionHandler.handleException(ex); return Collections.emptyList(); } }可以看到这个方法所做的事情就是直接调用 ShardingExecuteEngine 的 groupExecute 方法完成具体的执行工作并添加了异常处理机制而已。从源码解析到日常开发我们可以从今天的内容中提炼出来许多技巧并应用于日常开发过程中。比较实用的一个技巧是我们可以使用 Guava 提供的 ListeningExecutorService 来强化 JDK 中基于普通 Future 的执行器服务 ExecutorService。同时我们也看到了基于 Callback 的系统扩展机制我们可以基于这种扩展机制构建一个独立的运行环境从而把与业务相关的所有操作通过回调得以实现。小结与预告本篇是介绍 ShardingSphere 执行引擎的第一部分内容介绍了分片环境下 SQL 执行流程的抽象过程。我们先引出了执行引擎这个核心类然后分别从执行器服务、执行回调以及执行模板类等维度对整个执行流程展开了详细讲述。最后这里给你留一道思考题在基于多线程技术实现 Executor 时ShardingSphere 应用了哪些技巧欢迎你在留言区与大家讨论我将 一 一 点评解答。下一篇我们继续介绍 ShardingSphere 的执行引擎我们将重点关注 SQL 的执行器 StatementExecutor。