数据库中间件 MyCAT源码分析 —— PreparedStatement 重新入门

芋道源码 2017-07-17

原文地址:http://www.yunai.me/MyCAT/what-is-PreparedStatement/
MyCat-Server 带注释代码地址 :https://github.com/YunaiV/Mycat-Server
😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号
QQ :7685413


  • 1. 概述

  • 2. JDBC Client 实现

  • 3. MyCAT Server 实现

    • 3.1 创建 PreparedStatement

    • 3.2 执行 SQL

  • 4. 彩蛋


1. 概述

相信很多同学在学习 JDBC 时,都碰到 PreparedStatementStatement。究竟该使用哪个呢?最终很可能是懵里懵懂的看了各种总结,使用 PreparedStatement。那么本文,通过 MyCAT 对 PreparedStatement 的实现对大家能够重新理解下。

本文主要分成两部分:

  1. JDBC Client 如何实现 PreparedStatement

  2. MyCAT Server 如何处理 PreparedStatement

😈 Let's Go,

2. JDBC Client 实现

首先,我们来看一段大家最喜欢复制粘贴之一的代码,JDBC PreparedStatement 查询 MySQL 数据库:

  1. public class PreparedStatementDemo {

  2.    public static void main(String[] args) throws ClassNotFoundException, SQLException {

  3.        // 1. 获得数据库连接

  4.        Class.forName("com.mysql.jdbc.Driver");

  5.        Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:8066/dbtest?useServerPrepStmts=true", "root", "123456");

  6.        // PreparedStatement

  7.        PreparedStatement ps = conn.prepareStatement("SELECT id, username, password FROM t_user WHERE id = ?");

  8.        ps.setLong(1, Math.abs(new Random().nextLong()));

  9.        // execute

  10.        ps.executeQuery();

  11.    }

  12. }

获取 MySQL 连接时, useServerPrepStmts=true非常非常非常重要的参数。如果不配置, PreparedStatement 实际是个PreparedStatement(新版本默认为 FALSE,据说部分老版本默认为 TRUE),未开启服务端级别的 SQL 预编译。

WHY ?来看下 JDBC 里面是怎么实现的。

  1. // com.mysql.jdbc.ConnectionImpl.java

  2. public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {

  3.   synchronized (getConnectionMutex()) {

  4.       checkClosed();

  5.       PreparedStatement pStmt = null;

  6.       boolean canServerPrepare = true;

  7.       String nativeSql = getProcessEscapeCodesForPrepStmts() ? nativeSQL(sql) : sql;

  8.       if (this.useServerPreparedStmts && getEmulateUnsupportedPstmts()) {

  9.           canServerPrepare = canHandleAsServerPreparedStatement(nativeSql);

  10.       }

  11.       if (this.useServerPreparedStmts && canServerPrepare) {

  12.           if (this.getCachePreparedStatements()) { // 从缓存中获取 pStmt

  13.               synchronized (this.serverSideStatementCache) {

  14.                   pStmt = (com.mysql.jdbc.ServerPreparedStatement) this.serverSideStatementCache

  15.                           .remove(makePreparedStatementCacheKey(this.database, sql));

  16.                   if (pStmt != null) {

  17.                       ((com.mysql.jdbc.ServerPreparedStatement) pStmt).setClosed(false);

  18.                       pStmt.clearParameters(); // 清理上次留下的参数

  19.                   }

  20.                   if (pStmt == null) {

  21.                        // .... 省略代码 :向 Server 提交 SQL 预编译。

  22.                   }

  23.               }

  24.           } else {

  25.               try {

  26.                   // 向 Server 提交 SQL 预编译。

  27.                   pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);

  28.                   pStmt.setResultSetType(resultSetType);

  29.                   pStmt.setResultSetConcurrency(resultSetConcurrency);

  30.               } catch (SQLException sqlEx) {

  31.                   // Punt, if necessary

  32.                   if (getEmulateUnsupportedPstmts()) {

  33.                       pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);

  34.                   } else {

  35.                       throw sqlEx;

  36.                   }

  37.               }

  38.           }

  39.       } else {

  40.           pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);

  41.       }

  42.       return pStmt;

  43.   }

  44. }

  • 【前者】当 Client 开启 useServerPreparedStmts 并且 Server 支持 ServerPrepareClient 会向 Server 提交 SQL 预编译请求

  1. if (this.useServerPreparedStmts && canServerPrepare) {

  2.    pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);

  3. }

  • 【后者】当 Client 未开启 useServerPreparedStmts 或者 Server 不支持 ServerPrepare,Client 创建 PreparedStatement不会向 Server 提交 SQL 预编译请求

  1. pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);

即使这样,究竟为什么性能会更好呢?

  • 【前者】返回的 PreparedStatement 对象类是 JDBC42ServerPreparedStatement.java,后续每次执行 SQL 只需将对应占位符?对应的值提交给 Server即可,减少网络传输和 SQL 解析开销。

  • 【后者】返回的 PreparedStatement 对象类是 JDBC42PreparedStatement.java,后续每次执行 SQL 需要将完整的 SQL 提交给 Server,增加了网络传输和 SQL 解析开销。

🌚:【前者】性能一定比【后者】好吗?相信你已经有了正确的答案。

3. MyCAT Server 实现

3.1 创建 PreparedStatement

该操作对应 Client conn.prepareStatement(....)

MyCAT 接收到请求后,创建 PreparedStatement,并返回 statementId 等信息。Client 发起 SQL 执行时,需要将 statementId 带给 MyCAT。核心代码如下:

  1. // ServerPrepareHandler.java

  2. @Override

  3. public void prepare(String sql) {

  4. LOGGER.debug("use server prepare, sql: " + sql);

  5.   PreparedStatement pstmt = pstmtForSql.get(sql);

  6.   if (pstmt == null) { // 缓存中获取

  7.       // 解析获取字段个数和参数个数

  8.       int columnCount = getColumnCount(sql);

  9.       int paramCount = getParamCount(sql);

  10.       pstmt = new PreparedStatement(++pstmtId, sql, columnCount, paramCount);

  11.       pstmtForSql.put(pstmt.getStatement(), pstmt);

  12.       pstmtForId.put(pstmt.getId(), pstmt);

  13.   }

  14.   PreparedStmtResponse.response(pstmt, source);

  15. }

  16. // PreparedStmtResponse.java

  17. public static void response(PreparedStatement pstmt, FrontendConnection c) {

  18.   byte packetId = 0;

  19.   // write preparedOk packet

  20.   PreparedOkPacket preparedOk = new PreparedOkPacket();

  21.   preparedOk.packetId = ++packetId;

  22.   preparedOk.statementId = pstmt.getId();

  23.   preparedOk.columnsNumber = pstmt.getColumnsNumber();

  24.   preparedOk.parametersNumber = pstmt.getParametersNumber();

  25.   ByteBuffer buffer = preparedOk.write(c.allocate(), c,true);

  26.   // write parameter field packet

  27.   int parametersNumber = preparedOk.parametersNumber;

  28.   if (parametersNumber > 0) {

  29.       for (int i = 0; i < parametersNumber; i++) {

  30.           FieldPacket field = new FieldPacket();

  31.           field.packetId = ++packetId;

  32.           buffer = field.write(buffer, c,true);

  33.       }

  34.       EOFPacket eof = new EOFPacket();

  35.       eof.packetId = ++packetId;

  36.       buffer = eof.write(buffer, c,true);

  37.   }

  38.   // write column field packet

  39.   int columnsNumber = preparedOk.columnsNumber;

  40.   if (columnsNumber > 0) {

  41.       for (int i = 0; i < columnsNumber; i++) {

  42.           FieldPacket field = new FieldPacket();

  43.           field.packetId = ++packetId;

  44.           buffer = field.write(buffer, c,true);

  45.       }

  46.       EOFPacket eof = new EOFPacket();

  47.       eof.packetId = ++packetId;

  48.       buffer = eof.write(buffer, c,true);

  49.   }

  50.   // send buffer

  51.   c.write(buffer);

  52. }

每个连接之间,PreparedStatement 不共享,即不同连接,即使 SQL相同,对应的 PreparedStatement 不同。

3.2 执行 SQL

该操作对应 Client conn.execute(....)

MyCAT 接收到请求后,将 PreparedStatement 使用请求的参数格式化成可执行的 SQL 进行执行。伪代码如下:

  1. String sql = pstmt.sql.format(request.params);

  2. execute(sql);

核心代码如下:

  1. // ServerPrepareHandler.java

  2. @Override

  3. public void execute(byte[] data) {

  4.   long pstmtId = ByteUtil.readUB4(data, 5);

  5.   PreparedStatement pstmt = null;

  6.   if ((pstmt = pstmtForId.get(pstmtId)) == null) {

  7.       source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, "Unknown pstmtId when executing.");

  8.   } else {

  9.       // 参数读取

  10.       ExecutePacket packet = new ExecutePacket(pstmt);

  11.       try {

  12.           packet.read(data, source.getCharset());

  13.       } catch (UnsupportedEncodingException e) {

  14.           source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, e.getMessage());

  15.           return;

  16.       }

  17.       BindValue[] bindValues = packet.values;

  18.       // 还原sql中的动态参数为实际参数值

  19.       String sql = prepareStmtBindValue(pstmt, bindValues);

  20.       // 执行sql

  21.       source.getSession2().setPrepared(true);

  22.       source.query(sql);

  23.   }

  24. }

  25. private String prepareStmtBindValue(PreparedStatement pstmt, BindValue[] bindValues) {

  26.   String sql = pstmt.getStatement();

  27.   int[] paramTypes = pstmt.getParametersType();

  28.   StringBuilder sb = new StringBuilder();

  29.   int idx = 0;

  30.   for (int i = 0, len = sql.length(); i < len; i++) {

  31.       char c = sql.charAt(i);

  32.       if (c != '?') {

  33.           sb.append(c);

  34.           continue;

  35.       }

  36.       // 处理占位符?

  37.       int paramType = paramTypes[idx];

  38.       BindValue bindValue = bindValues[idx];

  39.       idx++;

  40.       // 处理字段为空的情况

  41.       if (bindValue.isNull) {

  42.           sb.append("NULL");

  43.           continue;

  44.       }

  45.       // 非空情况, 根据字段类型获取值

  46.       switch (paramType & 0xff) {

  47.           case Fields.FIELD_TYPE_TINY:

  48.               sb.append(String.valueOf(bindValue.byteBinding));

  49.               break;

  50.           case Fields.FIELD_TYPE_SHORT:

  51.               sb.append(String.valueOf(bindValue.shortBinding));

  52.               break;

  53.           case Fields.FIELD_TYPE_LONG:

  54.               sb.append(String.valueOf(bindValue.intBinding));

  55.               break;

  56.           // .... 省略非核心代码

  57.        }

  58.   }

  59.   return sb.toString();

  60. }

4. 彩蛋

💯 看到此处是不是真爱?!反正我信了。
给老铁们额外加个🍗。

细心的同学们可能已经注意到 JDBC Client 是支持缓存 PreparedStatement,无需每次都让 Server 进行创建。

当配置 MySQL 数据连接 cachePrepStmts=true 时开启 Client 级别的缓存。But,此处的缓存又和一般的缓存不一样,是使用 remove 的方式获得的,并且创建好 PreparedStatement 时也不添加到缓存。那什么时候添加缓存呢?在 pstmt.close() 时,并且 pstmt 是通过缓存获取时,添加到缓存。核心代码如下:

  1. // ServerPreparedStatement.java

  2. public void close() throws SQLException {

  3.   MySQLConnection locallyScopedConn = this.connection;

  4.   if (locallyScopedConn == null) {

  5.       return; // already closed

  6.   }

  7.   synchronized (locallyScopedConn.getConnectionMutex()) {

  8.       if (this.isCached && isPoolable() && !this.isClosed) {

  9.           clearParameters();

  10.           this.isClosed = true;

  11.           this.connection.recachePreparedStatement(this);

  12.           return;

  13.       }

  14.       realClose(true, true);

  15.   }

  16. }

  17. // ConnectionImpl.java

  18. public void recachePreparedStatement(ServerPreparedStatement pstmt) throws SQLException {

  19.   synchronized (getConnectionMutex()) {

  20.       if (getCachePreparedStatements() && pstmt.isPoolable()) {

  21.           synchronized (this.serverSideStatementCache) {

  22.               this.serverSideStatementCache.put(makePreparedStatementCacheKey(pstmt.currentCatalog, pstmt.originalSql), pstmt);

  23.           }

  24.       }

  25.   }

  26. }

为什么要这么实现? PreparedStatement 是有状态的变量,我们会去 setXXX(pos,value),一旦多线程共享,会导致错乱。

🗿 这个“彩蛋”还满意么?请关注我的公众号:芋艿的后端小屋。下一篇更新:《MyCAT源码解析 —— MongoDB》,极大可能就在本周噢。

另外推荐一篇文章:《JDBC PreparedStatement》。


文章已于修改
    本站仅按申请收录文章,版权归原作者所有
    如若侵权,请联系本站删除
    觉得不错,分享给更多人看到