可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
1. 概述
可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。
本文主要分成四部分:
- 总体流程,让你有个整体的认识
- 查询操作
- 插入操作
- 彩蛋,😈彩蛋,🙂彩蛋
建议你看过这两篇文章(_非必须_):
2. 主流程
- MyCAT Server 接收 MySQL Client 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDB Server。
- MyCAT Server 接收 MongoDB Server 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQL Client。
这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。
Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。
MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。
是不是熟悉的味道。不得不说 JDBC 规范的精妙。
3. 查询操作
- SELECTid,nameFROMuserWHEREname>''ORDERBY_idDESC;
看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。
1)、查询 MongoDB
- //MongoSQLParser.java
- publicMongoDataquery()throwsMongoSQLException{
- if(!(statementinstanceofSQLSelectStatement)){
- //returnnull;
- thrownewIllegalArgumentException("notaquerysqlstatement");
- }
- MongoDatamongo=newMongoData();
- DBCursorc=null;
- SQLSelectStatementselectStmt=(SQLSelectStatement)statement;
- SQLSelectQuerysqlSelectQuery=selectStmt.getSelect().getQuery();
- inticount=0;
- if(sqlSelectQueryinstanceofMySqlSelectQueryBlock){
- MySqlSelectQueryBlockmysqlSelectQuery=(MySqlSelectQueryBlock)selectStmt.getSelect().getQuery();
- BasicDBObjectfields=newBasicDBObject();
- //显示(返回)的字段
- for(SQLSelectItemitem:mysqlSelectQuery.getSelectList()){
- //System.out.println(item.toString());
- if(!(item.getExpr()instanceofSQLAllColumnExpr)){
- if(item.getExpr()instanceofSQLAggregateExpr){
- SQLAggregateExprexpr=(SQLAggregateExpr)item.getExpr();
- if(expr.getMethodName().equals("COUNT")){//TODO待读:count(*)
- icount=1;
- mongo.setField(getExprFieldName(expr),Types.BIGINT);
- }
- fields.put(getExprFieldName(expr),1);
- }else{
- fields.put(getFieldName(item),1);
- }
- }
- }
- //表名
- SQLTableSourcetable=mysqlSelectQuery.getFrom();
- DBCollectioncoll=this._db.getCollection(table.toString());
- mongo.setTable(table.toString());
- //WHERE
- SQLExprexpr=mysqlSelectQuery.getWhere();
- DBObjectquery=parserWhere(expr);
- //GROUPBY
- SQLSelectGroupByClausegroupby=mysqlSelectQuery.getGroupBy();
- BasicDBObjectgbkey=newBasicDBObject();
- if(groupby!=null){
- for(SQLExprgbexpr:groupby.getItems()){
- if(gbexprinstanceofSQLIdentifierExpr){
- Stringname=((SQLIdentifierExpr)gbexpr).getName();
- gbkey.put(name,Integer.valueOf(1));
- }
- }
- icount=2;
- }
- //SKIP/LIMIT
- intlimitoff=0;
- intlimitnum=0;
- if(mysqlSelectQuery.getLimit()!=null){
- limitoff=getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());
- limitnum=getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());
- }
- if(icount==1){//COUNT(*)
- mongo.setCount(coll.count(query));
- }elseif(icount==2){//MapReduce
- BasicDBObjectinitial=newBasicDBObject();
- initial.put("num",0);
- Stringreduce="function(obj,prev){"+"prev.num++}";
- mongo.setGrouyBy(coll.group(gbkey,query,initial,reduce));
- }else{
- if((limitoff>0)||(limitnum>0)){
- c=coll.find(query,fields).skip(limitoff).limit(limitnum);
- }else{
- c=coll.find(query,fields);
- }
- //orderby
- SQLOrderByorderby=mysqlSelectQuery.getOrderBy();
- if(orderby!=null){
- BasicDBObjectorder=newBasicDBObject();
- for(inti=0;i<orderby.getItems().size();i++){
- SQLSelectOrderByItemorderitem=orderby.getItems().get(i);
- order.put(orderitem.getExpr().toString(),getSQLExprToAsc(orderitem.getType()));
- }
- c.sort(order);
- //System.out.println(order);
- }
- }
- mongo.setCursor(c);
- }
- returnmongo;
- }
2)、查询条件
- //MongoSQLParser.java
- privatevoidparserWhere(SQLExpraexpr,BasicDBObjecto){
- if(aexprinstanceofSQLBinaryOpExpr){
- SQLBinaryOpExprexpr=(SQLBinaryOpExpr)aexpr;
- SQLExprexprL=expr.getLeft();
- if(!(exprLinstanceofSQLBinaryOpExpr)){
- if(expr.getOperator().getName().equals("=")){
- o.put(exprL.toString(),getExpValue(expr.getRight()));
- }else{
- Stringop="";
- if(expr.getOperator().getName().equals("<")){
- op="$lt";
- }elseif(expr.getOperator().getName().equals("<=")){
- op="$lte";
- }elseif(expr.getOperator().getName().equals(">")){
- op="$gt";
- }elseif(expr.getOperator().getName().equals(">=")){
- op="$gte";
- }elseif(expr.getOperator().getName().equals("!=")){
- op="$ne";
- }elseif(expr.getOperator().getName().equals("<>")){
- op="$ne";
- }
- parserDBObject(o,exprL.toString(),op,getExpValue(expr.getRight()));
- }
- }else{
- if(expr.getOperator().getName().equals("AND")){
- parserWhere(exprL,o);
- parserWhere(expr.getRight(),o);
- }elseif(expr.getOperator().getName().equals("OR")){
- orWhere(exprL,expr.getRight(),o);
- }else{
- thrownewRuntimeException("Can'tidentifytheoperationofofwhere");
- }
- }
- }
- }
- privatevoidorWhere(SQLExprexprL,SQLExprexprR,BasicDBObjectob){
- BasicDBObjectxo=newBasicDBObject();
- BasicDBObjectyo=newBasicDBObject();
- parserWhere(exprL,xo);
- parserWhere(exprR,yo);
- ob.put("$or",newObject[]{xo,yo});
- }
3)、解析 MongoDB 数据
- //MongoResultSet.java
- publicMongoResultSet(MongoDatamongo,Stringschema)throwsSQLException{
- this._cursor=mongo.getCursor();
- this._schema=schema;
- this._table=mongo.getTable();
- this.isSum=mongo.getCount()>0;
- this._sum=mongo.getCount();
- this.isGroupBy=mongo.getType();
- if(this.isGroupBy){
- dblist=mongo.getGrouyBys();
- this.isSum=true;
- }
- if(this._cursor!=null){
- select=_cursor.getKeysWanted().keySet().toArray(newString[0]);
- //解析fields
- if(this._cursor.hasNext()){
- _cur=_cursor.next();
- if(_cur!=null){
- if(select.length==0){
- SetFields(_cur.keySet());
- }
- _row=1;
- }
- }
- //设置fields类型
- if(select.length==0){
- select=newString[]{"_id"};
- SetFieldType(true);
- }else{
- SetFieldType(false);
- }
- }else{
- SetFields(mongo.getFields().keySet());//newString[]{"COUNT(*)"};
- SetFieldType(mongo.getFields());
- }
- }
当使用 SELECT * 查询字段时,fields 使用***条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。
4)、返回数据给 MySQL Client
- //JDBCConnection.java
- privatevoidouputResultSet(ServerConnectionsc,Stringsql)
- throwsSQLException{
- ResultSetrs=null;
- Statementstmt=null;
- try{
- stmt=con.createStatement();
- rs=stmt.executeQuery(sql);
- //header
- List<FieldPacket>fieldPks=newLinkedList<>();
- ResultSetUtil.resultSetToFieldPacket(sc.getCharset(),fieldPks,rs,this.isSpark);
- intcolunmCount=fieldPks.size();
- ByteBufferbyteBuf=sc.allocate();
- ResultSetHeaderPacketheaderPkg=newResultSetHeaderPacket();
- headerPkg.fieldCount=fieldPks.size();
- headerPkg.packetId=++packetId;
- byteBuf=headerPkg.write(byteBuf,sc,true);
- byteBuf.flip();
- byte[]header=newbyte[byteBuf.limit()];
- byteBuf.get(header);
- byteBuf.clear();
- List<byte[]>fields=newArrayList<byte[]>(fieldPks.size());
- for(FieldPacketcurField:fieldPks){
- curField.packetId=++packetId;
- byteBuf=curField.write(byteBuf,sc,false);
- byteBuf.flip();
- byte[]field=newbyte[byteBuf.limit()];
- byteBuf.get(field);
- byteBuf.clear();
- fields.add(field);
- }
- //headereof
- EOFPacketeofPckg=newEOFPacket();
- eofPckg.packetId=++packetId;
- byteBuf=eofPckg.write(byteBuf,sc,false);
- byteBuf.flip();
- byte[]eof=newbyte[byteBuf.limit()];
- byteBuf.get(eof);
- byteBuf.clear();
- this.respHandler.fieldEofResponse(header,fields,eof,this);
- //row
- while(rs.next()){
- RowDataPacketcurRow=newRowDataPacket(colunmCount);
- for(inti=0;i<colunmCount;i++){
- intj=i+1;
- if(MysqlDefs.isBianry((byte)fieldPks.get(i).type)){
- curRow.add(rs.getBytes(j));
- }elseif(fieldPks.get(i).type==MysqlDefs.FIELD_TYPE_DECIMAL||
- fieldPks.get(i).type==(MysqlDefs.FIELD_TYPE_NEW_DECIMAL-256)){//fieldtypeisunsignedbyte
- //ensurethatdonotusescientificnotationformat
- BigDecimalval=rs.getBigDecimal(j);
- curRow.add(StringUtil.encode(val!=null?val.toPlainString():null,sc.getCharset()));
- }else{
- curRow.add(StringUtil.encode(rs.getString(j),sc.getCharset()));
- }
- }
- curRow.packetId=++packetId;
- byteBuf=curRow.write(byteBuf,sc,false);
- byteBuf.flip();
- byte[]row=newbyte[byteBuf.limit()];
- byteBuf.get(row);
- byteBuf.clear();
- this.respHandler.rowResponse(row,this);
- }
- fieldPks.clear();
- //roweof
- eofPckg=newEOFPacket();
- eofPckg.packetId=++packetId;
- byteBuf=eofPckg.write(byteBuf,sc,false);
- byteBuf.flip();
- eof=newbyte[byteBuf.limit()];
- byteBuf.get(eof);
- sc.recycle(byteBuf);
- this.respHandler.rowEofResponse(eof,this);
- }finally{
- if(rs!=null){
- try{
- rs.close();
- }catch(SQLExceptione){
- }
- }
- if(stmt!=null){
- try{
- stmt.close();
- }catch(SQLExceptione){
- }
- }
- }
- }
- //MongoResultSet.java
- @Override
- publicStringgetString(StringcolumnLabel)throwsSQLException{
- Objectx=getObject(columnLabel);
- if(x==null){
- returnnull;
- }
- returnx.toString();
- }
当返回字段值是 Object 时,返回该对象.toString()。例如:
- mysql>select*fromuserorderby_idasc;
- +--------------------------+------+-------------------------------+
- |_id|name|profile|
- +--------------------------+------+-------------------------------+
- |1|123|{"age":1,"height":100}|
4. 插入操作
- //MongoSQLParser.java
- publicintexecuteUpdate()throwsMongoSQLException{
- if(statementinstanceofSQLInsertStatement){
- returnInsertData((SQLInsertStatement)statement);
- }
- if(statementinstanceofSQLUpdateStatement){
- returnUpData((SQLUpdateStatement)statement);
- }
- if(statementinstanceofSQLDropTableStatement){
- returndropTable((SQLDropTableStatement)statement);
- }
- if(statementinstanceofSQLDeleteStatement){
- returnDeleteDate((SQLDeleteStatement)statement);
- }
- if(statementinstanceofSQLCreateTableStatement){
- return1;
- }
- return1;
- }
- privateintInsertData(SQLInsertStatementstate){
- if(state.getValues().getValues().size()==0){
- thrownewRuntimeException("numberofcolumnserror");
- }
- if(state.getValues().getValues().size()!=state.getColumns().size()){
- thrownewRuntimeException("numberofvaluesandcolumnshavetomatch");
- }
- SQLTableSourcetable=state.getTableSource();
- BasicDBObjecto=newBasicDBObject();
- inti=0;
- for(SQLExprcol:state.getColumns()){
- o.put(getFieldName2(col),getExpValue(state.getValues().getValues().get(i)));
- i++;
- }
- DBCollectioncoll=this._db.getCollection(table.toString());
- coll.insert(o);
- return1;
- }
5. 彩蛋
1)、支持多 MongoDB ,并使用 MyCAT 进行分片。
MyCAT 配置:multi_mongodb
2)、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。
查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。
MyCAT 配置:single_mongodb_mysql
3)、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。
MyCAT 配置:single_mongodb
©本文为清一色官方代发,观点仅代表作者本人,与清一色无关。清一色对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文不作为投资理财建议,请读者仅作参考,并请自行承担全部责任。文中部分文字/图片/视频/音频等来源于网络,如侵犯到著作权人的权利,请与我们联系(微信/QQ:1074760229)。转载请注明出处:清一色财经