SQL ON MongoDB实现原理

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。

1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。

吼吼吼,让我们开启这段神奇的“旅途”。

本文主要分成四部分:

  1. 总体流程,让你有个整体的认识
  2. 查询操作
  3. 插入操作
  4. 彩蛋,😈彩蛋,🙂彩蛋

建议你看过这两篇文章(_非必须_):

  1. 《MyCAT源码分析 —— 【单库单表】插入》
  2. 《MyCAT源码分析 —— 【单库单表】查询》

2. 主流程

SQL ON MongoDB实现原理

  1. MyCAT Server 接收 MySQL Client 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDB Server。
  2. MyCAT Server 接收 MongoDB Server 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQL Client。

这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。

SQL ON MongoDB实现原理

Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。

MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。

SQL ON MongoDB实现原理

是不是熟悉的味道。不得不说 JDBC 规范的精妙。

3. 查询操作

  1. SELECTid,nameFROMuserWHEREname>''ORDERBY_idDESC;

SQL ON MongoDB实现原理

看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。

1)、查询 MongoDB

  1. //MongoSQLParser.java
  2. publicMongoDataquery()throwsMongoSQLException{
  3. if(!(statementinstanceofSQLSelectStatement)){
  4. //returnnull;
  5. thrownewIllegalArgumentException("notaquerysqlstatement");
  6. }
  7. MongoDatamongo=newMongoData();
  8. DBCursorc=null;
  9. SQLSelectStatementselectStmt=(SQLSelectStatement)statement;
  10. SQLSelectQuerysqlSelectQuery=selectStmt.getSelect().getQuery();
  11. inticount=0;
  12. if(sqlSelectQueryinstanceofMySqlSelectQueryBlock){
  13. MySqlSelectQueryBlockmysqlSelectQuery=(MySqlSelectQueryBlock)selectStmt.getSelect().getQuery();
  14. BasicDBObjectfields=newBasicDBObject();
  15. //显示(返回)的字段
  16. for(SQLSelectItemitem:mysqlSelectQuery.getSelectList()){
  17. //System.out.println(item.toString());
  18. if(!(item.getExpr()instanceofSQLAllColumnExpr)){
  19. if(item.getExpr()instanceofSQLAggregateExpr){
  20. SQLAggregateExprexpr=(SQLAggregateExpr)item.getExpr();
  21. if(expr.getMethodName().equals("COUNT")){//TODO待读:count(*)
  22. icount=1;
  23. mongo.setField(getExprFieldName(expr),Types.BIGINT);
  24. }
  25. fields.put(getExprFieldName(expr),1);
  26. }else{
  27. fields.put(getFieldName(item),1);
  28. }
  29. }
  30. }
  31. //表名
  32. SQLTableSourcetable=mysqlSelectQuery.getFrom();
  33. DBCollectioncoll=this._db.getCollection(table.toString());
  34. mongo.setTable(table.toString());
  35. //WHERE
  36. SQLExprexpr=mysqlSelectQuery.getWhere();
  37. DBObjectquery=parserWhere(expr);
  38. //GROUPBY
  39. SQLSelectGroupByClausegroupby=mysqlSelectQuery.getGroupBy();
  40. BasicDBObjectgbkey=newBasicDBObject();
  41. if(groupby!=null){
  42. for(SQLExprgbexpr:groupby.getItems()){
  43. if(gbexprinstanceofSQLIdentifierExpr){
  44. Stringname=((SQLIdentifierExpr)gbexpr).getName();
  45. gbkey.put(name,Integer.valueOf(1));
  46. }
  47. }
  48. icount=2;
  49. }
  50. //SKIP/LIMIT
  51. intlimitoff=0;
  52. intlimitnum=0;
  53. if(mysqlSelectQuery.getLimit()!=null){
  54. limitoff=getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());
  55. limitnum=getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());
  56. }
  57. if(icount==1){//COUNT(*)
  58. mongo.setCount(coll.count(query));
  59. }elseif(icount==2){//MapReduce
  60. BasicDBObjectinitial=newBasicDBObject();
  61. initial.put("num",0);
  62. Stringreduce="function(obj,prev){"+"prev.num++}";
  63. mongo.setGrouyBy(coll.group(gbkey,query,initial,reduce));
  64. }else{
  65. if((limitoff>0)||(limitnum>0)){
  66. c=coll.find(query,fields).skip(limitoff).limit(limitnum);
  67. }else{
  68. c=coll.find(query,fields);
  69. }
  70. //orderby
  71. SQLOrderByorderby=mysqlSelectQuery.getOrderBy();
  72. if(orderby!=null){
  73. BasicDBObjectorder=newBasicDBObject();
  74. for(inti=0;i<orderby.getItems().size();i++){
  75. SQLSelectOrderByItemorderitem=orderby.getItems().get(i);
  76. order.put(orderitem.getExpr().toString(),getSQLExprToAsc(orderitem.getType()));
  77. }
  78. c.sort(order);
  79. //System.out.println(order);
  80. }
  81. }
  82. mongo.setCursor(c);
  83. }
  84. returnmongo;
  85. }

2)、查询条件

  1. //MongoSQLParser.java
  2. privatevoidparserWhere(SQLExpraexpr,BasicDBObjecto){
  3. if(aexprinstanceofSQLBinaryOpExpr){
  4. SQLBinaryOpExprexpr=(SQLBinaryOpExpr)aexpr;
  5. SQLExprexprL=expr.getLeft();
  6. if(!(exprLinstanceofSQLBinaryOpExpr)){
  7. if(expr.getOperator().getName().equals("=")){
  8. o.put(exprL.toString(),getExpValue(expr.getRight()));
  9. }else{
  10. Stringop="";
  11. if(expr.getOperator().getName().equals("<")){
  12. op="$lt";
  13. }elseif(expr.getOperator().getName().equals("<=")){
  14. op="$lte";
  15. }elseif(expr.getOperator().getName().equals(">")){
  16. op="$gt";
  17. }elseif(expr.getOperator().getName().equals(">=")){
  18. op="$gte";
  19. }elseif(expr.getOperator().getName().equals("!=")){
  20. op="$ne";
  21. }elseif(expr.getOperator().getName().equals("<>")){
  22. op="$ne";
  23. }
  24. parserDBObject(o,exprL.toString(),op,getExpValue(expr.getRight()));
  25. }
  26. }else{
  27. if(expr.getOperator().getName().equals("AND")){
  28. parserWhere(exprL,o);
  29. parserWhere(expr.getRight(),o);
  30. }elseif(expr.getOperator().getName().equals("OR")){
  31. orWhere(exprL,expr.getRight(),o);
  32. }else{
  33. thrownewRuntimeException("Can'tidentifytheoperationofofwhere");
  34. }
  35. }
  36. }
  37. }
  38. privatevoidorWhere(SQLExprexprL,SQLExprexprR,BasicDBObjectob){
  39. BasicDBObjectxo=newBasicDBObject();
  40. BasicDBObjectyo=newBasicDBObject();
  41. parserWhere(exprL,xo);
  42. parserWhere(exprR,yo);
  43. ob.put("$or",newObject[]{xo,yo});
  44. }

3)、解析 MongoDB 数据

  1. //MongoResultSet.java
  2. publicMongoResultSet(MongoDatamongo,Stringschema)throwsSQLException{
  3. this._cursor=mongo.getCursor();
  4. this._schema=schema;
  5. this._table=mongo.getTable();
  6. this.isSum=mongo.getCount()>0;
  7. this._sum=mongo.getCount();
  8. this.isGroupBy=mongo.getType();
  9. if(this.isGroupBy){
  10. dblist=mongo.getGrouyBys();
  11. this.isSum=true;
  12. }
  13. if(this._cursor!=null){
  14. select=_cursor.getKeysWanted().keySet().toArray(newString[0]);
  15. //解析fields
  16. if(this._cursor.hasNext()){
  17. _cur=_cursor.next();
  18. if(_cur!=null){
  19. if(select.length==0){
  20. SetFields(_cur.keySet());
  21. }
  22. _row=1;
  23. }
  24. }
  25. //设置fields类型
  26. if(select.length==0){
  27. select=newString[]{"_id"};
  28. SetFieldType(true);
  29. }else{
  30. SetFieldType(false);
  31. }
  32. }else{
  33. SetFields(mongo.getFields().keySet());//newString[]{"COUNT(*)"};
  34. SetFieldType(mongo.getFields());
  35. }
  36. }

当使用 SELECT * 查询字段时,fields 使用***条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。

4)、返回数据给 MySQL Client

  1. //JDBCConnection.java
  2. privatevoidouputResultSet(ServerConnectionsc,Stringsql)
  3. throwsSQLException{
  4. ResultSetrs=null;
  5. Statementstmt=null;
  6. try{
  7. stmt=con.createStatement();
  8. rs=stmt.executeQuery(sql);
  9. //header
  10. List<FieldPacket>fieldPks=newLinkedList<>();
  11. ResultSetUtil.resultSetToFieldPacket(sc.getCharset(),fieldPks,rs,this.isSpark);
  12. intcolunmCount=fieldPks.size();
  13. ByteBufferbyteBuf=sc.allocate();
  14. ResultSetHeaderPacketheaderPkg=newResultSetHeaderPacket();
  15. headerPkg.fieldCount=fieldPks.size();
  16. headerPkg.packetId=++packetId;
  17. byteBuf=headerPkg.write(byteBuf,sc,true);
  18. byteBuf.flip();
  19. byte[]header=newbyte[byteBuf.limit()];
  20. byteBuf.get(header);
  21. byteBuf.clear();
  22. List<byte[]>fields=newArrayList<byte[]>(fieldPks.size());
  23. for(FieldPacketcurField:fieldPks){
  24. curField.packetId=++packetId;
  25. byteBuf=curField.write(byteBuf,sc,false);
  26. byteBuf.flip();
  27. byte[]field=newbyte[byteBuf.limit()];
  28. byteBuf.get(field);
  29. byteBuf.clear();
  30. fields.add(field);
  31. }
  32. //headereof
  33. EOFPacketeofPckg=newEOFPacket();
  34. eofPckg.packetId=++packetId;
  35. byteBuf=eofPckg.write(byteBuf,sc,false);
  36. byteBuf.flip();
  37. byte[]eof=newbyte[byteBuf.limit()];
  38. byteBuf.get(eof);
  39. byteBuf.clear();
  40. this.respHandler.fieldEofResponse(header,fields,eof,this);
  41. //row
  42. while(rs.next()){
  43. RowDataPacketcurRow=newRowDataPacket(colunmCount);
  44. for(inti=0;i<colunmCount;i++){
  45. intj=i+1;
  46. if(MysqlDefs.isBianry((byte)fieldPks.get(i).type)){
  47. curRow.add(rs.getBytes(j));
  48. }elseif(fieldPks.get(i).type==MysqlDefs.FIELD_TYPE_DECIMAL||
  49. fieldPks.get(i).type==(MysqlDefs.FIELD_TYPE_NEW_DECIMAL-256)){//fieldtypeisunsignedbyte
  50. //ensurethatdonotusescientificnotationformat
  51. BigDecimalval=rs.getBigDecimal(j);
  52. curRow.add(StringUtil.encode(val!=null?val.toPlainString():null,sc.getCharset()));
  53. }else{
  54. curRow.add(StringUtil.encode(rs.getString(j),sc.getCharset()));
  55. }
  56. }
  57. curRow.packetId=++packetId;
  58. byteBuf=curRow.write(byteBuf,sc,false);
  59. byteBuf.flip();
  60. byte[]row=newbyte[byteBuf.limit()];
  61. byteBuf.get(row);
  62. byteBuf.clear();
  63. this.respHandler.rowResponse(row,this);
  64. }
  65. fieldPks.clear();
  66. //roweof
  67. eofPckg=newEOFPacket();
  68. eofPckg.packetId=++packetId;
  69. byteBuf=eofPckg.write(byteBuf,sc,false);
  70. byteBuf.flip();
  71. eof=newbyte[byteBuf.limit()];
  72. byteBuf.get(eof);
  73. sc.recycle(byteBuf);
  74. this.respHandler.rowEofResponse(eof,this);
  75. }finally{
  76. if(rs!=null){
  77. try{
  78. rs.close();
  79. }catch(SQLExceptione){
  80. }
  81. }
  82. if(stmt!=null){
  83. try{
  84. stmt.close();
  85. }catch(SQLExceptione){
  86. }
  87. }
  88. }
  89. }
  90. //MongoResultSet.java
  91. @Override
  92. publicStringgetString(StringcolumnLabel)throwsSQLException{
  93. Objectx=getObject(columnLabel);
  94. if(x==null){
  95. returnnull;
  96. }
  97. returnx.toString();
  98. }

当返回字段值是 Object 时,返回该对象.toString()。例如:

  1. mysql>select*fromuserorderby_idasc;
  2. +--------------------------+------+-------------------------------+
  3. |_id|name|profile|
  4. +--------------------------+------+-------------------------------+
  5. |1|123|{"age":1,"height":100}|

4. 插入操作

SQL ON MongoDB实现原理

  1. //MongoSQLParser.java
  2. publicintexecuteUpdate()throwsMongoSQLException{
  3. if(statementinstanceofSQLInsertStatement){
  4. returnInsertData((SQLInsertStatement)statement);
  5. }
  6. if(statementinstanceofSQLUpdateStatement){
  7. returnUpData((SQLUpdateStatement)statement);
  8. }
  9. if(statementinstanceofSQLDropTableStatement){
  10. returndropTable((SQLDropTableStatement)statement);
  11. }
  12. if(statementinstanceofSQLDeleteStatement){
  13. returnDeleteDate((SQLDeleteStatement)statement);
  14. }
  15. if(statementinstanceofSQLCreateTableStatement){
  16. return1;
  17. }
  18. return1;
  19. }
  20. privateintInsertData(SQLInsertStatementstate){
  21. if(state.getValues().getValues().size()==0){
  22. thrownewRuntimeException("numberofcolumnserror");
  23. }
  24. if(state.getValues().getValues().size()!=state.getColumns().size()){
  25. thrownewRuntimeException("numberofvaluesandcolumnshavetomatch");
  26. }
  27. SQLTableSourcetable=state.getTableSource();
  28. BasicDBObjecto=newBasicDBObject();
  29. inti=0;
  30. for(SQLExprcol:state.getColumns()){
  31. o.put(getFieldName2(col),getExpValue(state.getValues().getValues().get(i)));
  32. i++;
  33. }
  34. DBCollectioncoll=this._db.getCollection(table.toString());
  35. coll.insert(o);
  36. return1;
  37. }

5. 彩蛋

1)、支持多 MongoDB ,并使用 MyCAT 进行分片。

MyCAT 配置:multi_mongodb

2)、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。

查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。

MyCAT 配置:single_mongodb_mysql

3)、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。

MyCAT 配置:single_mongodb

©本文为清一色官方代发,观点仅代表作者本人,与清一色无关。清一色对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文不作为投资理财建议,请读者仅作参考,并请自行承担全部责任。文中部分文字/图片/视频/音频等来源于网络,如侵犯到著作权人的权利,请与我们联系(微信/QQ:1074760229)。转载请注明出处:清一色财经

(0)
打赏 微信扫码打赏 微信扫码打赏 支付宝扫码打赏 支付宝扫码打赏
清一色的头像清一色管理团队
上一篇 2023年5月6日 19:36
下一篇 2023年5月6日 19:36

相关推荐

发表评论

登录后才能评论

联系我们

在线咨询:1643011589-QQbutton

手机:13798586780

QQ/微信:1074760229

QQ群:551893940

工作时间:工作日9:00-18:00,节假日休息

关注微信