数据库中间件 Sharding-JDBC 源码分析 —— 结果归并

芋道源码 2017-10-30

摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/result-merger/ 「芋道源码」欢迎转载,保留摘要,谢谢!

微信排版崩崩的,建议使用 PC 点击【阅读原文】。

本文主要基于 Sharding-JDBC 1.5.0 正式版

  • 1. 概述

  • 2. MergeEngine

    • 2.2.1 AbstractStreamResultSetMerger

    • 2.2.2 AbstractMemoryResultSetMerger

    • 2.2.3 AbstractDecoratorResultSetMerger

    • 2.1 SelectStatement#setIndexForItems()

    • 2.2 ResultSetMerger

  • 3. OrderByStreamResultSetMerger

    • 3.1 归并算法

    • 3.2 #next()

  • 4. GroupByStreamResultSetMerger

    • 4.1 AggregationUnit

    • 4.2 #next()

  • 5. GroupByMemoryResultSetMerger

    • 5.1 #next()

  • 6. IteratorStreamResultSetMerger

  • 7. LimitDecoratorResultSetMerger

  • 666. 彩蛋

🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表

  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址

  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢

  4. 新的源码解析文章实时收到通知。每周更新一篇左右

  5. 认真的源码交流微信群。


1. 概述

本文分享查询结果归并的源码实现。

正如前文《SQL 执行》提到的“分表分库,需要执行的 SQL 数量从单条变成了多条”,多个SQL执行结果必然需要进行合并,例如:

  1. SELECT * FROM t_order ORDER BY create_time

在各分片排序完后,Sharding-JDBC 获取到结果后,仍然需要再进一步排序。目前有 分页分组排序聚合列迭代 五种场景需要做进一步处理。当然,如果单分片SQL执行结果是无需合并的。在《SQL 执行》不知不觉已经分享了插入、更新、删除操作的结果合并,所以下面我们一起看看查询结果归并的实现。


Sharding-JDBC 正在收集使用公司名单:传送门。
🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门
Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门
登记吧,骚年!传送门

2. MergeEngine

MergeEngine,分片结果集归并引擎。

  1. // MergeEngine.java

  2. /**

  3. * 数据库类型

  4. */

  5. private final DatabaseType databaseType;

  6. /**

  7. * 结果集集合

  8. */

  9. private final List<ResultSet> resultSets;

  10. /**

  11. * Select SQL语句对象

  12. */

  13. private final SelectStatement selectStatement;

  14. /**

  15. * 查询列名与位置映射

  16. */

  17. private final Map<String, Integer> columnLabelIndexMap;

  18. public MergeEngine(final DatabaseType databaseType, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {

  19.   this.databaseType = databaseType;

  20.   this.resultSets = resultSets;

  21.   this.selectStatement = selectStatement;

  22.   // 获得 查询列名与位置映射

  23.   columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0));

  24. }

  25. /**

  26. * 获得 查询列名与位置映射

  27. *

  28. * @param resultSet 结果集

  29. * @return 查询列名与位置映射

  30. * @throws SQLException 当结果集已经关闭

  31. */

  32. private Map<String, Integer> getColumnLabelIndexMap(final ResultSet resultSet) throws SQLException {

  33.   ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); // 元数据(包含查询列信息)

  34.   Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);

  35.   for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {

  36.       result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i);

  37.   }

  38.   return result;

  39. }

  • 当 MergeEngine 被创建时,会传入 resultSets 结果集集合,并根据其获得 columnLabelIndexMap 查询列名与位置映射。通过 columnLabelIndexMap,可以很方便的使用查询列名获得在返回结果记录列( essay-header )的第几列。


MergeEngine 的 #merge() 方法作为入口提供查询结果归并功能。

  1. /**

  2. * 合并结果集.

  3. *

  4. * @return 归并完毕后的结果集

  5. * @throws SQLException SQL异常

  6. */

  7. public ResultSetMerger merge() throws SQLException {

  8.   selectStatement.setIndexForItems(columnLabelIndexMap);

  9.   return decorate(build());

  10. }

  • #merge() 主体逻辑就两行代码,设置查询列位置信息,并返回合适的归并结果集接口( ResultSetMerger ) 实现。

2.1 SelectStatement#setIndexForItems()

  1. // SelectStatement.java

  2. /**

  3. * 为选择项设置索引.

  4. *

  5. * @param columnLabelIndexMap 列标签索引字典

  6. */

  7. public void setIndexForItems(final Map<String, Integer> columnLabelIndexMap) {

  8.   setIndexForAggregationItem(columnLabelIndexMap);

  9.   setIndexForOrderItem(columnLabelIndexMap, orderByItems);

  10.   setIndexForOrderItem(columnLabelIndexMap, groupByItems);

  11. }

  • 部分查询列是经过推到出来,在 SQL解析 过程中,未获得到查询列位置,需要通过该方法进行初始化。对这块不了解的同学,回头可以看下《SQL 解析(三)之查询SQL》。🙂 现在不用回头,皇冠会掉。

  • #setIndexForAggregationItem() 处理 AVG聚合计算列 推导出其对应的 SUM/COUNT 聚合计算列的位置:

  1. private void setIndexForAggregationItem(final Map<String, Integer> columnLabelIndexMap) {

  2.   for (AggregationSelectItem each : getAggregationSelectItems()) {

  3.       Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s, please add alias for aggregate selections", each));

  4.       each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));

  5.       for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) {

  6.           Preconditions.checkState(columnLabelIndexMap.containsKey(derived.getColumnLabel()), String.format("Can't find index: %s", derived));

  7.           derived.setIndex(columnLabelIndexMap.get(derived.getColumnLabel()));

  8.       }

  9.   }

  10. }

  • #setIndexForOrderItem() 处理 ORDER BY / GROUP BY 列不在查询列 推导出的查询列的位置:

    1. private void setIndexForOrderItem(final Map<String, Integer> columnLabelIndexMap, final List<OrderItem> orderItems) {

    2.    for (OrderItem each : orderItems) {

    3.      if (-1 != each.getIndex()) {

    4.          continue;

    5.      }

    6.      Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s", each));

    7.      if (columnLabelIndexMap.containsKey(each.getColumnLabel())) {

    8.          each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));

    9.      }

    10.    }

    11. }

    2.2 ResultSetMerger

    ResultSetMerger,归并结果集接口。

    我们先来看看整体的类结构关系:

    功能 上分成四种:

    • 分组:GroupByMemoryResultSetMerger、GroupByStreamResultSetMerger;包含聚合列

    • 排序:OrderByStreamResultSetMerger

    • 迭代:IteratorStreamResultSetMerger

    • 分页:LimitDecoratorResultSetMerger

    实现方式 上分成三种:

    • Stream 流式:AbstractStreamResultSetMerger

    • Memory 内存:AbstractMemoryResultSetMerger

    • Decorator 装饰者:AbstractDecoratorResultSetMerger

    什么时候该用什么实现方式?

    • Stream 流式:将数据游标与结果集的游标保持一致,顺序的从结果集中一条条的获取正确的数据。看完下文第三节OrderByStreamResultSetMerger 可以形象的理解。

    • Memory 内存:需要将结果集的所有数据都遍历并存储在内存中,再通过内存归并后,将内存中的数据伪装成结果集返回。看完下文第五节 GroupByMemoryResultSetMerger 可以形象的理解。

    • Decorator 装饰者:可以和前二者任意组合

    1. // MergeEngine.java

    2. /**

    3. * 合并结果集.

    4. *

    5. * @return 归并完毕后的结果集

    6. * @throws SQLException SQL异常

    7. */

    8. public ResultSetMerger merge() throws SQLException {

    9.   selectStatement.setIndexForItems(columnLabelIndexMap);

    10.   return decorate(build());

    11. }

    12. private ResultSetMerger build() throws SQLException {

    13.   if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) { // 分组 或 聚合列

    14.       if (selectStatement.isSameGroupByAndOrderByItems()) {

    15.           return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());

    16.       } else {

    17.           return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());

    18.       }

    19.   }

    20.   if (!selectStatement.getOrderByItems().isEmpty()) {

    21.       return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType());

    22.   }

    23.   return new IteratorStreamResultSetMerger(resultSets);

    24. }

    25. private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException {

    26.   ResultSetMerger result = resultSetMerger;

    27.   if (null != selectStatement.getLimit()) {

    28.       result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit());

    29.   }

    30.   return result;

    31. }

    2.2.1 AbstractStreamResultSetMerger

    AbstractStreamResultSetMerger,流式归并结果集抽象类,提供从当前结果集获得行数据。

    1. public abstract class AbstractStreamResultSetMerger implements ResultSetMerger {

    2.    /**

    3.     * 当前结果集

    4.     */

    5.    private ResultSet currentResultSet;

    6.    protected ResultSet getCurrentResultSet() throws SQLException {

    7.        if (null == currentResultSet) {

    8.            throw new SQLException("Current ResultSet is null, ResultSet perhaps end of next.");

    9.        }

    10.        return currentResultSet;

    11.    }

    12.    @Override

    13.    public Object getValue(final int columnIndex, final Class type) throws SQLException {

    14.        if (Object.class == type) {

    15.            return getCurrentResultSet().getObject(columnIndex);

    16.        }

    17.        if (int.class == type) {

    18.            return getCurrentResultSet().getInt(columnIndex);

    19.        }

    20.        if (String.class == type) {

    21.            return getCurrentResultSet().getString(columnIndex);

    22.        }

    23.        // .... 省略其他数据类型读取类似代码

    24.        return getCurrentResultSet().getObject(columnIndex);

    25.    }

    26. }

    2.2.2 AbstractMemoryResultSetMerger

    AbstractMemoryResultSetMerger,内存归并结果集抽象类,提供从内存数据行对象( MemoryResultSetRow ) 获得行数据。

    1. public abstract class AbstractMemoryResultSetMerger implements ResultSetMerger {

    2.    private final Map<String, Integer> labelAndIndexMap;

    3.    /**

    4.     * 内存数据行对象

    5.     */

    6.    @Setter

    7.    private MemoryResultSetRow currentResultSetRow;

    8.    @Override

    9.    public Object getValue(final int columnIndex, final Class type) throws SQLException {

    10.        if (Blob.class == type || Clob.class == type || Reader.class == type || InputStream.class == type || SQLXML.class == type) {

    11.            throw new SQLFeatureNotSupportedException();

    12.        }

    13.        return currentResultSetRow.getCell(columnIndex);

    14.    }

    15. }

    • 和 AbstractStreamResultSetMerger 对比,貌似区别不大?!确实,从抽象父类上看,两种实现方式差不多。抽象父类提供给实现子类的是数据读取的功能,真正的流式归并、内存归并是在子类实现上体现。

    1. public class MemoryResultSetRow {

    2.    /**

    3.     * 行数据

    4.     */

    5.    private final Object[] data;

    6.    public MemoryResultSetRow(final ResultSet resultSet) throws SQLException {

    7.        data = load(resultSet);

    8.    }

    9.    /**

    10.     * 加载 ResultSet 当前行数据到内存

    11.     * @param resultSet 结果集

    12.     * @return 行数据

    13.     * @throws SQLException 当结果集关闭

    14.     */

    15.    private Object[] load(final ResultSet resultSet) throws SQLException {

    16.        int columnCount = resultSet.getMetaData().getColumnCount();

    17.        Object[] result = new Object[columnCount];

    18.        for (int i = 0; i < columnCount; i++) {

    19.            result[i] = resultSet.getObject(i + 1);

    20.        }

    21.        return result;

    22.    }

    23.    /**

    24.     * 获取数据.

    25.     *

    26.     * @param columnIndex 列索引

    27.     * @return 数据

    28.     */

    29.    public Object getCell(final int columnIndex) {

    30.        Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1);

    31.        return data[columnIndex - 1];

    32.    }

    33.    /**

    34.     * 设置数据.

    35.     *

    36.     * @param columnIndex 列索引

    37.     * @param value 值

    38.     */

    39.    public void setCell(final int columnIndex, final Object value) {

    40.        Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1);

    41.        data[columnIndex - 1] = value;

    42.    }

    43. }

    • 调用 #load() 方法,将当前结果集的一条行数据加载到内存。

    2.2.3 AbstractDecoratorResultSetMerger

    AbstractDecoratorResultSetMerger,装饰结果集归并抽象类,通过调用其装饰的归并对象 #getValue() 方法获得行数据。

    1. public abstract class AbstractDecoratorResultSetMerger implements ResultSetMerger {

    2.    /**

    3.     * 装饰的归并对象

    4.     */

    5.    private final ResultSetMerger resultSetMerger;

    6.    @Override

    7.    public Object getValue(final int columnIndex, final Class type) throws SQLException {

    8.        return resultSetMerger.getValue(columnIndex, type);

    9.    }

    10. }

    3. OrderByStreamResultSetMerger

    OrderByStreamResultSetMerger,基于 Stream 方式排序归并结果集实现。

    3.1 归并算法

    因为各个分片结果集已经排序完成,使用《归并算法》能够充分利用这个优势。

    归并操作(merge),也叫归并算法,指的是将两个已经排序的序列合并成一个序列的操作。归并排序算法依赖归并操作。

    【迭代法】

    1. 申请空间,使其大小为两个已经排序序列之和,该空间用来存放合并后的序列

    2. 设定两个指针,最初位置分别为两个已经排序序列的起始位置

    3. 比较两个指针所指向的元素,选择相对小的元素放入到合并空间,并移动指针到下一位置

    4. 重复步骤3直到某一指针到达序列尾

    5. 将另一序列剩下的所有元素直接复制到合并序列尾

    从定义上看,是不是超级符合我们这个场景。😈 此时此刻,你是不是捂着胸口,感叹:“大学怎么没好好学数据结构与算法呢”?反正我是捂着了,都是眼泪。

    1. public class OrderByStreamResultSetMerger extends AbstractStreamResultSetMerger {

    2.    /**

    3.     * 排序列

    4.     */

    5.    @Getter(AccessLevel.NONE)

    6.    private final List<OrderItem> orderByItems;

    7.    /**

    8.     * 排序值对象队列

    9.     */

    10.    private final Queue<OrderByValue> orderByValuesQueue;

    11.    /**

    12.     * 默认排序类型

    13.     */

    14.    private final OrderType nullOrderType;

    15.    /**

    16.     * 是否第一个 ResultSet 已经调用 #next()

    17.     */

    18.    private boolean isFirstNext;

    19.    public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems, final OrderType nullOrderType) throws SQLException {

    20.        this.orderByItems = orderByItems;

    21.        this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());

    22.        this.nullOrderType = nullOrderType;

    23.        orderResultSetsToQueue(resultSets);

    24.        isFirstNext = true;

    25.    }

    26.    private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException {

    27.        for (ResultSet each : resultSets) {

    28.            OrderByValue orderByValue = new OrderByValue(each, orderByItems, nullOrderType);

    29.            if (orderByValue.next()) {

    30.                orderByValuesQueue.offer(orderByValue);

    31.            }

    32.        }

    33.        // 设置当前 ResultSet,这样 #getValue() 能拿到记录

    34.        setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet());

    35.    }

    • 属性 orderByValuesQueue 使用的队列实现是优先级队列( PriorityQueue )。有兴趣的同学可以看看《JDK源码研究PriorityQueue》,本文不展开讲,不是主角戏份不多。我们记住几个方法的用途:



      • #offer():增加元素。增加时,会将该元素和已有元素们按照优先级进行排序

      • #peek():获得优先级第一的元素

      • #pool():获得优先级第一的元素并移除

    • 一个 ResultSet 构建一个 OrderByValue 用于排序,即上文归并算法提到的“空间”


      • 调用 OrderByValue#next() 方法时,获得其对应结果集排在第一条的记录,通过 #getOrderValues() 计算该记录的排序字段值。这样两个OrderByValue 通过 #compareTo() 方法可以比较两个结果集的第一条记录。

    1. public final class OrderByValue implements Comparable<OrderByValue> {

      /**

    2.  * 已排序结果集

    3.  */

    4. @Getter

    5. private final ResultSet resultSet;

    6. /**

    7.  * 排序列

    8.  */

    9. private final List&lt;OrderItem&gt; orderByItems;

    10. /**

    11.  * 默认排序类型

    12.  */

    13. private final OrderType nullOrderType;

    14. /**

    15.  * 排序列对应的值数组

    16.  * 因为一条记录可能有多个排序列,所以是数组

    17.  */

    18. private List&lt;Comparable&lt;?&gt;&gt; orderValues;

    19. /**

    20.  * 遍历下一个结果集游标.

    21.  * 

    22.  * @return 是否有下一个结果集

    23.  * @throws SQLException SQL异常

    24.  */

    25. public boolean next() throws SQLException {

    26.     boolean result = resultSet.next();

    27.     orderValues = result ? getOrderValues() : Collections.&lt;Comparable&lt;?&gt;&gt;emptyList();

    28.     return result;

    29. }

    30. /**

    31.  * 获得 排序列对应的值数组

    32.  *

    33.  * @return 排序列对应的值数组

    34.  * @throws SQLException 当结果集关闭时

    35.  */

    36. private List&lt;Comparable&lt;?&gt;&gt; getOrderValues() throws SQLException {

    37.     List&lt;Comparable&lt;?&gt;&gt; result = new ArrayList&lt;&gt;(orderByItems.size());

    38.     for (OrderItem each : orderByItems) {

    39.         Object value = resultSet.getObject(each.getIndex());

    40.         Preconditions.checkState(null == value || value instanceof Comparable, "Order by value must implements Comparable");

    41.         result.add((Comparable&lt;?&gt;) value);

    42.     }

    43.     return result;

    44. }

    45. /**

    46.  * 对比 {@link #orderValues},即两者的第一条记录

    47.  *

    48.  * @param o 对比 OrderByValue

    49.  * @return -1 0 1

    50.  */

    51. @Override

    52. public int compareTo(final OrderByValue o) {

    53.     for (int i = 0; i &lt; orderByItems.size(); i++) {

    54.         OrderItem thisOrderBy = orderByItems.get(i);

    55.         int result = ResultSetUtil.compareTo(orderValues.get(i), o.orderValues.get(i), thisOrderBy.getType(), nullOrderType);

    56.         if (0 != result) {

    57.             return result;

    58.         }

    59.     }

    60.     return 0;

    61. }

    62. }

  • if(orderByValue.next()){ 处,调用 OrderByValue#next() 后,添加到 PriorityQueue。因此, orderByValuesQueue.peek().getResultSet() 能够获得多个 ResultSet 中排在第一的。

  • 3.2 #next()

    通过调用 OrderByStreamResultSetMerger#next() 不断获得当前排在第一的记录。 #next() 每次调用后,实际做的是当前 ResultSet 的替换,以及当前的 ResultSet 的记录指向下一条。这样说起来可能比较绕,我们来看一张图:

    • 白色向下箭头:OrderByStreamResultSetMerger 对 ResultSet 的指向。

    • 黑色箭头:ResultSet 对当前记录的指向。

    • ps:这块如果分享的不清晰让您费劲,十分抱歉。欢迎加我微信(wangwenbin-server)交流下,这样我也可以优化表述。

    1. // OrderByStreamResultSetMerger.java

    2. @Override

    3. public boolean next() throws SQLException {

    4.   if (orderByValuesQueue.isEmpty()) {

    5.       return false;

    6.   }

    7.   if (isFirstNext) {

    8.       isFirstNext = false;

    9.       return true;

    10.   }

    11.   // 移除上一次获得的 ResultSet

    12.   OrderByValue firstOrderByValue = orderByValuesQueue.poll();

    13.   // 如果上一次获得的 ResultSet还有下一条记录,继续添加到 排序值对象队列

    14.   if (firstOrderByValue.next()) {

    15.       orderByValuesQueue.offer(firstOrderByValue);

    16.   }

    17.   if (orderByValuesQueue.isEmpty()) {

    18.       return false;

    19.   }

    20.   // 设置当前 ResultSet

    21.   setCurrentResultSet(orderByValuesQueue.peek().getResultSet());

    22.   return true;

    23. }

    • orderByValuesQueue.poll() 移除上一次获得的 ResultSet。为什么不能 #setCurrentResultSet() 就移除呢?如果该 ResultSet 里面还存在下一条记录,需要继续参加排序。而判断是否有下一条,需要调用 ResultSet#next() 方法,这会导致 ResultSet 指向了下一条记录。因而 orderByValuesQueue.poll() 调用是后置的。

    • isFirstNext 变量那的判断看着是不是很“灵异”?因为 #orderResultSetsToQueue() 处设置了第一次的 ResultSet。如果不加这个标记,会导致第一条记录“不见”了。

    • 通过不断的 Queue#poll()Queue#offset() 实现排序。巧妙!仿佛 Get 新技能了:

    1. // 移除上一次获得的 ResultSet

    2. OrderByValue firstOrderByValue = orderByValuesQueue.poll();

    3. // 如果上一次获得的 ResultSet还有下一条记录,继续添加到 排序值对象队列

    4. if (firstOrderByValue.next()) {

    5.  orderByValuesQueue.offer(firstOrderByValue);

    6. }


    在看下,我们上文 Stream 方式归并的定义:将数据游标与结果集的游标保持一致,顺序的从结果集中一条条的获取正确的数据。是不是能够清晰的对上了?!🙂

    4. GroupByStreamResultSetMerger

    GroupByStreamResultSetMerger,基于 Stream 方式分组归并结果集实现。 它继承自 OrderByStreamResultSetMerger,在排序的逻辑上,实现分组功能。实现原理也较为简单:

    1. public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMerger {

    2.    /**

    3.     * 查询列名与位置映射

    4.     */

    5.    private final Map<String, Integer> labelAndIndexMap;

    6.    /**

    7.     * Select SQL语句对象

    8.     */

    9.    private final SelectStatement selectStatement;

    10.    /**

    11.     * 当前结果记录

    12.     */

    13.    private final List<Object> currentRow;

    14.    /**

    15.     * 下一条结果记录 GROUP BY 条件

    16.     */

    17.    private List currentGroupByValues;

    18.    public GroupByStreamResultSetMerger(

    19.            final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException {

    20.        super(resultSets, selectStatement.getOrderByItems(), nullOrderType);

    21.        this.labelAndIndexMap = labelAndIndexMap;

    22.        this.selectStatement = selectStatement;

    23.        currentRow = new ArrayList<>(labelAndIndexMap.size());

    24.        // 初始化下一条结果记录 GROUP BY 条件

    25.        currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();

    26.    }

    27.    @Override

    28.    public Object getValue(final int columnIndex, final Class type) throws SQLException {

    29.        return currentRow.get(columnIndex - 1);

    30.    }

    31.    @Override

    32.    public Object getValue(final String columnLabel, final Class type) throws SQLException {

    33.        Preconditions.checkState(labelAndIndexMap.containsKey(columnLabel), String.format("Can't find columnLabel: %s", columnLabel));

    34.        return currentRow.get(labelAndIndexMap.get(columnLabel) - 1);

    35.    }

    36. }

    • currentRow 为当前结果记录,使用 #getValue()#getCalendarValue() 方法获得当前结果记录的查询列值。

    • currentGroupByValues下一条结果记录 GROUP BY 条件,通过 GroupByValue 生成:

    1. public final class GroupByValue {

      /**

    2.  * 分组条件值数组

    3.  */

    4. private final List&lt;?&gt; groupValues;

    5. public GroupByValue(final ResultSet resultSet, final List&lt;OrderItem&gt; groupByItems) throws SQLException {

    6.     groupValues = getGroupByValues(resultSet, groupByItems);

    7. }

    8. /**

    9.  * 获得分组条件值数组

    10.  * 例如,`GROUP BY user_id, order_status` 返回的某条记录结果为 `userId = 1, order_status = 3`,对应的 `groupValues = [1, 3]`

    11.  * @param resultSet 结果集(单分片)

    12.  * @param groupByItems 分组列

    13.  * @return 分组条件值数组

    14.  * @throws SQLException 当结果集关闭

    15.  */

    16. private List&lt;?&gt; getGroupByValues(final ResultSet resultSet, final List&lt;OrderItem&gt; groupByItems) throws SQLException {

    17.     List&lt;Object&gt; result = new ArrayList&lt;&gt;(groupByItems.size());

    18.     for (OrderItem each : groupByItems) {

    19.         result.add(resultSet.getObject(each.getIndex())); // 从结果集获得每个分组条件的值

    20.     }

    21.     return result;

    22. }

    23. }

  • GroupByStreamResultSetMerger 在创建时,当前结果记录实际未合并,需要先调用 #next(),在使用 #getValue() 等方法获取值,这个和 OrderByStreamResultSetMerger 不同,可能是个 BUG。

  • 4.1 AggregationUnit

    AggregationUnit,归并计算单元接口,有两个接口方法:

    • #merge():归并聚合值

    • #getResult():获取计算结果

    一共有三个实现类:

    • AccumulationAggregationUnit:累加聚合单元,解决 COUNT、SUM 聚合列

    • ComparableAggregationUnit:比较聚合单元,解决 MAX、MIN 聚合列

    • AverageAggregationUnit:平均值聚合单元,解决 AVG 聚合列

    实现都比较易懂,直接点击链接查看源码,我们就不浪费篇幅贴代码啦。

    4.2 #next()

    我们先看看大体的调用流程:

    😈 看起来代码比较多,逻辑其实比较清晰,对照着顺序图顺序往下读即可。

    1. // GroupByStreamResultSetMerger.java

    2. @Override

    3. public boolean next() throws SQLException {

    4.   // 清除当前结果记录

    5.   currentRow.clear();

    6.   if (getOrderByValuesQueue().isEmpty()) {

    7.       return false;

    8.   }

    9.   //

    10.   if (isFirstNext()) {

    11.       super.next();

    12.   }

    13.   // 顺序合并下面相同分组条件的记录

    14.   if (aggregateCurrentGroupByRowAndNext()) {

    15.       // 生成下一条结果记录 GROUP BY 条件

    16.       currentGroupByValues = new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();

    17.   }

    18.   return true;

    19. }

    20. private boolean aggregateCurrentGroupByRowAndNext() throws SQLException {

    21.   boolean result = false;

    22.   // 生成计算单元

    23.   Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {

    24.       @Override

    25.       public AggregationUnit apply(final AggregationSelectItem input) {

    26.           return AggregationUnitFactory.create(input.getType());

    27.       }

    28.   });

    29.   // 循环顺序合并下面相同分组条件的记录

    30.   while (currentGroupByValues.equals(new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())) {

    31.       // 归并聚合值

    32.       aggregate(aggregationUnitMap);

    33.       // 缓存当前记录到结果记录

    34.       cacheCurrentRow();

    35.       // 获取下一条记录

    36.       result = super.next();

    37.       if (!result) {

    38.           break;

    39.       }

    40.   }

    41.   // 设置当前记录的聚合字段结果

    42.   setAggregationValueToCurrentRow(aggregationUnitMap);

    43.   return result;

    44. }

    45. private void aggregate(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) throws SQLException {

    46.   for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {

    47.       List<Comparable> values = new ArrayList<>(2);

    48.       if (entry.getKey().getDerivedAggregationSelectItems().isEmpty()) { // SUM/COUNT/MAX/MIN 聚合列

    49.           values.add(getAggregationValue(entry.getKey()));

    50.       } else {

    51.           for (AggregationSelectItem each : entry.getKey().getDerivedAggregationSelectItems()) { // AVG 聚合列

    52.               values.add(getAggregationValue(each));

    53.           }

    54.       }

    55.       entry.getValue().merge(values);

    56.   }

    57. }

    58. private void cacheCurrentRow() throws SQLException {

    59.   for (int i = 0; i < getCurrentResultSet().getMetaData().getColumnCount(); i++) {

    60.       currentRow.add(getCurrentResultSet().getObject(i + 1));

    61.   }

    62. }

    63. private Comparable getAggregationValue(final AggregationSelectItem aggregationSelectItem) throws SQLException {

    64.   Object result = getCurrentResultSet().getObject(aggregationSelectItem.getIndex());

    65.   Preconditions.checkState(null == result || result instanceof Comparable, "Aggregation value must implements Comparable");

    66.   return (Comparable) result;

    67. }

    68. private void setAggregationValueToCurrentRow(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) {

    69.   for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {

    70.       currentRow.set(entry.getKey().getIndex() - 1, entry.getValue().getResult()); // 获取计算结果

    71.   }

    72. }

    5. GroupByMemoryResultSetMerger

    GroupByMemoryResultSetMerger,基于 内存 分组归并结果集实现。

    区别于 GroupByStreamResultSetMerger,其无法使用每个分片结果集的有序的特点,只能在内存中合并后,进行整个重新排序。因而,性能和内存都较 GroupByStreamResultSetMerger 会差。

    主流程如下:

    1. public final class GroupByMemoryResultSetMerger extends AbstractMemoryResultSetMerger {

    2.    /**

    3.     * Select SQL语句对象

    4.     */

    5.    private final SelectStatement selectStatement;

    6.    /**

    7.     * 默认排序类型

    8.     */

    9.    private final OrderType nullOrderType;

    10.    /**

    11.     * 内存结果集

    12.     */

    13.    private final Iterator<MemoryResultSetRow> memoryResultSetRows;

    14.    public GroupByMemoryResultSetMerger(

    15.            final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException {

    16.        super(labelAndIndexMap);

    17.        this.selectStatement = selectStatement;

    18.        this.nullOrderType = nullOrderType;

    19.        memoryResultSetRows = init(resultSets);

    20.    }

    21.    private Iterator<MemoryResultSetRow> init(final List<ResultSet> resultSets) throws SQLException {

    22.        Map<GroupByValue, MemoryResultSetRow> dataMap = new HashMap<>(1024); // 分组条件值与内存记录映射

    23.        Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap = new HashMap<>(1024); // 分组条件值与聚合列映射

    24.        // 遍历结果集

    25.        for (ResultSet each : resultSets) {

    26.            while (each.next()) {

    27.                // 生成分组条件

    28.                GroupByValue groupByValue = new GroupByValue(each, selectStatement.getGroupByItems());

    29.                // 初始化分组条件到 dataMap、aggregationMap 映射

    30.                initForFirstGroupByValue(each, groupByValue, dataMap, aggregationMap);

    31.                // 归并聚合值

    32.                aggregate(each, groupByValue, aggregationMap);

    33.            }

    34.        }

    35.        // 设置聚合列结果到内存记录

    36.        setAggregationValueToMemoryRow(dataMap, aggregationMap);

    37.        // 内存排序

    38.        List<MemoryResultSetRow> result = getMemoryResultSetRows(dataMap);

    39.        // 设置当前 ResultSet,这样 #getValue() 能拿到记录

    40.        if (!result.isEmpty()) {

    41.            setCurrentResultSetRow(result.get(0));

    42.        }

    43.        return result.iterator();

    44.    }

    45. }

    • #initForFirstGroupByValue() 初始化分组条件dataMapaggregationMap 映射中,这样可以调用 #aggregate() 将聚合值归并到 aggregationMap 里的该分组条件。


    1. private void initForFirstGroupByValue(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, MemoryResultSetRow> dataMap,

    2.                                      final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException {

    3.    // 初始化分组条件到 dataMap

    4.    if (!dataMap.containsKey(groupByValue)) {

    5.        dataMap.put(groupByValue, new MemoryResultSetRow(resultSet));

    6.    }

    7.    // 初始化分组条件到 aggregationMap

    8.    if (!aggregationMap.containsKey(groupByValue)) {

    9.        Map<AggregationSelectItem, AggregationUnit> map = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {

              @Override

    10.         public AggregationUnit apply(final AggregationSelectItem input) {

    11.             return AggregationUnitFactory.create(input.getType());

    12.         }

    13.     });

    14.     aggregationMap.put(groupByValue, map);

    15. }

    16. }

  • 聚合完每个分组条件后,将聚合列结果 aggregationMap 合并到 dataMap

    1. private void setAggregationValueToMemoryRow(final Map<GroupByValue, MemoryResultSetRow> dataMap, final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) {

    2.   for (Entry<GroupByValue, MemoryResultSetRow> entry : dataMap.entrySet()) { // 遍 历内存记录

    3.       for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) { // 遍历 每个聚合列

    4.           entry.getValue().setCell(each.getIndex(), aggregationMap.get(entry.getKey()).get(each).getResult());

    5.       }

    6.   }

    7. }

  • 调用 #getMemoryResultSetRows() 方法对内存记录进行内存排序

    1. // GroupByMemoryResultSetMerger.java

    2. private List<MemoryResultSetRow> getMemoryResultSetRows(final Map<GroupByValue, MemoryResultSetRow> dataMap) {

    3.   List<