我在前面提到过MongoDB不支持像SQL般字符式的操作指令,所以我们必须对所有的MongoDB操作指令建立protobuf类型才能支持MongoDB指令的序列化。在对上一篇博文里我们把MongoDB的消息指令序列化单独挑出来讨论了一番,在这篇我们准备在一个MongoDB scala开发环境里通过streaming运算来示范这些protobuf消息的应用。

 与前面我们介绍过的JDBC-streaming和Cassandra-streaming对应操作指令的处理相同,MGO-streaming也是是通过一个Context对象来描述操作方式和内容细节的,MGOContext定义如下:

 case class MGOContext(
                         dbName: String,
                         collName: String,
                         actionType: MGO_ACTION_TYPE = MGO_QUERY,
                         action: Option[MGOCommands] = None,
                         actionOptions: Option[Any] = None,
                         actionTargets: Seq[String] = Nil
                       ) {
    ctx =>
    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)

    def setCollName(name: String): MGOContext = ctx.copy(collName = name)

    def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)

    def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))

    def toSomeProto = MGOProtoConversion.ctxToProto(this)

  }

  object MGOContext {
    def apply(db: String, coll: String) = new MGOContext(db, coll)
    def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
      MGOProtoConversion.ctxFromProto(proto)
  }

上面的代码里包括了toSomeProto, fromProto两个函数来实现MGOContext的序列化转换处理。这两个函数的实现包含在文章后面提供的源代码中。MongoDB的.proto文件idl定义如下:

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";


option (scalapb.options) = {
  // use a custom Scala package name
  // package_name: "io.ontherocks.introgrpc.demo"

  // don't append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: "io.ontherocks.hellogrpc.RockingMessage"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: "sealed trait SomeSealedTrait"
};

/*
 * Demoes various customization options provided by ScalaPBs.
 */

package sdp.grpc.services;

import "misc/sdp.proto";

message ProtoMGOBson {
  bytes bson = 1;
}

message ProtoMGODocument {
  bytes document = 1;
}

message ProtoMGOResultOption { //FindObservable
   int32 optType = 1;
   ProtoMGOBson bsonParam = 2;
   int32 valueParam = 3;
}

message ProtoMGOAdmin{
  string tarName = 1;
  repeated ProtoMGOBson bsonParam  = 2;
  ProtoAny options = 3;
  string objName = 4;
}

message ProtoMGOContext {  //MGOContext
  string dbName = 1;
  string collName = 2;
  int32 commandType = 3;
  repeated ProtoMGOBson bsonParam = 4;
  repeated ProtoMGOResultOption resultOptions = 5;
  repeated string targets = 6;
  ProtoAny options = 7;
  repeated ProtoMGODocument documents = 8;
  google.protobuf.BoolValue only = 9;
  ProtoMGOAdmin adminOptions = 10;
}

下面是本次示范的.proto文件: 

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";


option (scalapb.options) = {
  // use a custom Scala package name
  // package_name: "io.ontherocks.introgrpc.demo"

  // don't append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: "io.ontherocks.hellogrpc.RockingMessage"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: "sealed trait SomeSealedTrait"
};

/*
 * Demoes various customization options provided by ScalaPBs.
 */

package sdp.grpc.services;

import "misc/sdp.proto";
import "cql/cql.proto";
import "jdbc/jdbc.proto";
import "mgo/mgo.proto";

service MGOServices {
  rpc clientStreaming(stream HelloMsg) returns (stream HelloMsg) {}
  rpc runQueries(stream ProtoMGOContext) returns (stream ProtoMGODocument) {}
}

以上通过import “mgo/mgo.proto”引用了ProtoMGOContext类型,并在服务定义rpc runQueries里用作了传入参数。

下面这段是本次示范的服务实现代码:

package sdp.grpc.mongo.server

import sdp.mongo.engine._
import MGOClasses._
import MGOEngine._
import akka.NotUsed
import akka.stream.scaladsl.Flow
import sdp.logging.LogSupport
import sdp.grpc.services._
import org.mongodb.scala._
import scala.concurrent._
import akka.stream.ActorMaterializer
import sdp.mongo.engine.MGOProtoConversion.MGODocument


class MGOStreamingServices(implicit ec: ExecutionContextExecutor,
                            mat: ActorMaterializer, client: MongoClient)
  extends MgostreamingGrpcAkkaStream.MGOServices with LogSupport {
  override def clientStreaming: Flow[HelloMsg, HelloMsg, NotUsed] = {
    Flow[HelloMsg]
      .map {r => HelloMsg(r.hello+", mongo ...")}
  }

  override def runQueries: Flow[ProtoMGOContext, ProtoMGODocument, NotUsed] =
    Flow[ProtoMGOContext]
    .flatMapConcat {p =>
      val ctx = MGOContext.fromProto(p)
      mongoStream(ctx).map{doc => MGODocument.toProto(doc)}
    }


}

这个runQueries服务函数的处理流程是:接收ProtoMGOContext、转换成MGOContext、传给mongoStream、运算mongoStream返回ProtoMGODocument结果。mongoStream函数的代码如下:

  def mongoStream(ctx: MGOContext)(
    implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
    log.info(s"mongoStream>  MGOContext: ${ctx}")

    def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
      rts.foldRight(findObj)((a,b) => a.toFindObservable(b))

    val db = client.getDatabase(ctx.dbName)
    val coll = db.getCollection(ctx.collName)
    if ( ctx.action == None) {
      log.error(s"mongoStream> uery action cannot be null!")
      throw new IllegalArgumentException("query action cannot be null!")
    }
    try {
      ctx.action.get match {
        case Find(None, Nil, false) => //FindObservable
          MongoSource(coll.find())
        case Find(None, Nil, true) => //FindObservable
          MongoSource(coll.find().first())
        case Find(Some(filter), Nil, false) => //FindObservable
          MongoSource(coll.find(filter))
        case Find(Some(filter), Nil, true) => //FindObservable
          MongoSource(coll.find(filter).first())
        case Find(None, sro, _) => //FindObservable
          val next = toResultOption(sro)
          MongoSource(next(coll.find[Document]()))
        case Find(Some(filter), sro, _) => //FindObservable
          val next = toResultOption(sro)
          MongoSource(next(coll.find[Document](filter)))
        case _ =>
          log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
          throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")

      }
    }
    catch { case e: Exception =>
      log.error(s"mongoStream> runtime error: ${e.getMessage}")
      throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
    }

  }

调用服务的gRPC客户端实现代码如下:

class MGOStreamClient(host: String, port: Int)(
  implicit ec: ExecutionContextExecutor) extends LogSupport {

  val channel = ManagedChannelBuilder
    .forAddress(host, port)
    .usePlaintext()
    .build()

  val stub = MgostreamingGrpcAkkaStream.stub(channel)

  def echoHello: Source[HelloMsg, NotUsed] = {
    val row = HelloMsg("hello ")
    val rows = List.fill[HelloMsg](100)(row)
    Source
      .fromIterator(() => rows.iterator)
      .via(stub.clientStreaming)
  }

  val filter = and(equal("state","California"),
    equal("county","Alameda"),
    equal("value",10))

  val proj = exclude("rowid","_id")
  val proj1 = include("county","value")
  val rtxfmr = Seq(
    ResultOptions(
      optType = FOD_LIMIT,
      value = 3),
    ResultOptions(
      optType = FOD_PROJECTION,
      bson = Some(proj)) /*,
    ResultOptions(
      optType = FOD_PROJECTION,
      bson = Some(proj1)) */
  )

  val cmd = Find(filter = Some(filter), andThen = rtxfmr)

  val ctx = MGOContext("testdb","aqmrpt").setCommand(cmd)

  def mgoQueries: Source[ProtoMGODocument,NotUsed] = {
       Source
         .single[ProtoMGOContext](ctx.toSomeProto.get)
      .via(stub.runQueries)
  }
}

object MGOStreamingClient extends App {
  implicit val system = ActorSystem("EchoNumsClient")
  implicit val mat = ActorMaterializer.create(system)
  implicit val ec = system.dispatcher
  val mgoClient = new MGOStreamClient("localhost", 50051)

   mgoClient.mgoQueries.runForeach(pd =>
      println(MGODocument.fromProto(pd).toJson()))

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

调用服务函数运算结果:

{ "measureid" : { "$numberLong" : "83" }, "state" : "California", "county" : "Alameda", "year" : 1999, "value" : 10 }
{ "measureid" : { "$numberLong" : "84" }, "state" : "California", "county" : "Alameda", "year" : 1999, "value" : 10 }
{ "measureid" : { "$numberLong" : "84" }, "state" : "California", "county" : "Alameda", "year" : 2000, "value" : 10 }

下面是MongoDB序列化类型转换工具函数的源代码:

MGOProtoConversion.scala

package sdp.mongo.engine
import org.mongodb.scala.bson.collection.immutable.Document
import org.bson.conversions.Bson
import sdp.grpc.services._
import protobuf.bytes.Converter._
import MGOClasses._
import MGOAdmins._
import MGOCommands._
import org.bson.BsonDocument
import org.bson.codecs.configuration.CodecRegistry
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.FindObservable

object MGOProtoConversion {

  type MGO_COMMAND_TYPE = Int
  val MGO_COMMAND_FIND            = 0
  val MGO_COMMAND_COUNT           = 20
  val MGO_COMMAND_DISTICT         = 21
  val MGO_COMMAND_DOCUMENTSTREAM  = 1
  val MGO_COMMAND_AGGREGATE       = 2
  val MGO_COMMAND_INSERT          = 3
  val MGO_COMMAND_DELETE          = 4
  val MGO_COMMAND_REPLACE         = 5
  val MGO_COMMAND_UPDATE          = 6


  val MGO_ADMIN_DROPCOLLECTION    = 8
  val MGO_ADMIN_CREATECOLLECTION  = 9
  val MGO_ADMIN_LISTCOLLECTION    = 10
  val MGO_ADMIN_CREATEVIEW        = 11
  val MGO_ADMIN_CREATEINDEX       = 12
  val MGO_ADMIN_DROPINDEXBYNAME   = 13
  val MGO_ADMIN_DROPINDEXBYKEY    = 14
  val MGO_ADMIN_DROPALLINDEXES    = 15


  case class AdminContext(
                           tarName: String = "",
                           bsonParam: Seq[Bson] = Nil,
                           options: Option[Any] = None,
                           objName: String = ""
                         ){
    def toProto = sdp.grpc.services.ProtoMGOAdmin(
      tarName = this.tarName,
      bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
      objName = this.objName,
      options = this.options.map(b => ProtoAny(marshal(b)))

    )
  }

  object AdminContext {
    def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(
      tarName = msg.tarName,
      bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
      objName = msg.objName,
      options = msg.options.map(b => unmarshal[Any](b.value))
    )
  }

  case class Context(
                      dbName: String = "",
                      collName: String = "",
                      commandType: MGO_COMMAND_TYPE,
                      bsonParam: Seq[Bson] = Nil,
                      resultOptions: Seq[ResultOptions] = Nil,
                      options: Option[Any] = None,
                      documents: Seq[Document] = Nil,
                      targets: Seq[String] = Nil,
                      only: Boolean = false,
                      adminOptions: Option[AdminContext] = None
                    ){

    def toProto = new sdp.grpc.services.ProtoMGOContext(
      dbName = this.dbName,
      collName = this.collName,
      commandType = this.commandType,
      bsonParam = this.bsonParam.map(bsonToProto),
      resultOptions = this.resultOptions.map(_.toProto),
      options = { if(this.options == None)
        Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
      else
        Some(ProtoAny(marshal(this.options.get))) },
      documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),
      targets = this.targets,
      only = Some(this.only),
      adminOptions = this.adminOptions.map(_.toProto)
    )

  }

  object MGODocument {
    def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =
      unmarshal[Document](msg.document)
    def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =
      new ProtoMGODocument(marshal(doc))
  }

  object MGOProtoMsg {
    def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(
      dbName = msg.dbName,
      collName = msg.collName,
      commandType = msg.commandType,
      bsonParam = msg.bsonParam.map(protoToBson),
      resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),
      options = msg.options.map(a => unmarshal[Any](a.value)),
      documents = msg.documents.map(doc => unmarshal[Document](doc.document)),
      targets = msg.targets,
      adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))
    )
  }

  def bsonToProto(bson: Bson) =
    ProtoMGOBson(marshal(bson.toBsonDocument(
      classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))

  def protoToBson(proto: ProtoMGOBson): Bson = new Bson {
    val bsdoc = unmarshal[BsonDocument](proto.bson)
    override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
  }

  def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {
    case MGO_COMMAND_FIND => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Find())
      )
      def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>
        rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))

      (proto.bsonParam, proto.resultOptions, proto.only) match {
        case (Nil, Nil, None) => ctx
        case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))
        case (bp,Nil,None) => ctx.setCommand(
          Find(filter = Some(protoToBson(bp.head))))
        case (bp,Nil,Some(b)) => ctx.setCommand(
          Find(filter = Some(protoToBson(bp.head)), firstOnly = b))
        case (bp,fo,None) => {
          ctx.setCommand(
            Find(filter = Some(protoToBson(bp.head)),
              andThen = fo.map(ResultOptions.fromProto)
            ))
        }
        case (bp,fo,Some(b)) => {
          ctx.setCommand(
            Find(filter = Some(protoToBson(bp.head)),
              andThen = fo.map(ResultOptions.fromProto),
              firstOnly = b))
        }
        case _ => ctx
      }
    }
    case MGO_COMMAND_COUNT => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Count())
      )
      (proto.bsonParam, proto.options) match {
        case (Nil, None) => ctx
        case (bp, None) => ctx.setCommand(
          Count(filter = Some(protoToBson(bp.head)))
        )
        case (Nil,Some(o)) => ctx.setCommand(
          Count(options = Some(unmarshal[Any](o.value)))
        )
        case _ => ctx
      }
    }
    case MGO_COMMAND_DISTICT => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Distict(fieldName = proto.targets.head))
      )
      (proto.bsonParam) match {
        case Nil => ctx
        case bp: Seq[ProtoMGOBson] => ctx.setCommand(
          Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))
        )
        case _ => ctx
      }
    }
    case MGO_COMMAND_AGGREGATE => {
      new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))
      )
    }
    case MGO_ADMIN_LISTCOLLECTION => {
      new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(ListCollection(proto.dbName)))
    }
    case MGO_COMMAND_INSERT => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Insert(
          newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(Insert(
          newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_COMMAND_DELETE => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Delete(
          filter = protoToBson(proto.bsonParam.head)))
      )
      (proto.options, proto.only) match {
        case (None,None) => ctx
        case (None,Some(b)) => ctx.setCommand(Delete(
          filter = protoToBson(proto.bsonParam.head),
          onlyOne = b))
        case (Some(o),None) => ctx.setCommand(Delete(
          filter = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)))
        )
        case (Some(o),Some(b)) => ctx.setCommand(Delete(
          filter = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)),
          onlyOne = b)
        )
      }
    }
    case MGO_COMMAND_REPLACE => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Replace(
          filter = protoToBson(proto.bsonParam.head),
          replacement = unmarshal[Document](proto.documents.head.document)))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(Replace(
          filter = protoToBson(proto.bsonParam.head),
          replacement = unmarshal[Document](proto.documents.head.document),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_COMMAND_UPDATE => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head)))
      )
      (proto.options, proto.only) match {
        case (None,None) => ctx
        case (None,Some(b)) => ctx.setCommand(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head),
          onlyOne = b))
        case (Some(o),None) => ctx.setCommand(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head),
          options = Some(unmarshal[Any](o.value)))
        )
        case (Some(o),Some(b)) => ctx.setCommand(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head),
          options = Some(unmarshal[Any](o.value)),
          onlyOne = b)
        )
      }
    }
    case MGO_ADMIN_DROPCOLLECTION =>
      new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropCollection(proto.collName))
      )
    case MGO_ADMIN_CREATECOLLECTION => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(CreateCollection(proto.collName))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(CreateCollection(proto.collName,
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_CREATEVIEW => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(CreateView(viewName = proto.targets.head,
          viewOn = proto.targets.tail.head,
          pipeline = proto.bsonParam.map(p => protoToBson(p))))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,
          viewOn = proto.targets.tail.head,
          pipeline = proto.bsonParam.map(p => protoToBson(p)),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_CREATEINDEX=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_DROPINDEXBYNAME=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropIndexByName(indexName = proto.targets.head))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_DROPINDEXBYKEY=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_DROPALLINDEXES=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropAllIndexes())
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(DropAllIndexes(
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }

  }

  def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {
    case None => None
    case Some(act) => act match {
      case Count(filter, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_COUNT,
          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
                        else Seq(bsonToProto(filter.get))},
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
                      else Some(ProtoAny(marshal(options.get))) }
      ))
      case Distict(fieldName, filter) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_DISTICT,
          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
                        else Seq(bsonToProto(filter.get))},
          targets = Seq(fieldName)

        ))

      case Find(filter, andThen, firstOnly) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_FIND,
          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
          else Seq(bsonToProto(filter.get))},
          resultOptions = andThen.map(_.toProto)
        ))

      case Aggregate(pipeLine) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_AGGREGATE,
          bsonParam = pipeLine.map(bsonToProto)
        ))

      case Insert(newdocs, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_INSERT,
          documents = newdocs.map(d => ProtoMGODocument(marshal(d))),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

      case Delete(filter, options, onlyOne) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_DELETE,
          bsonParam = Seq(bsonToProto(filter)),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          only = Some(onlyOne)
        ))

      case Replace(filter, replacement, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_REPLACE,
          bsonParam = Seq(bsonToProto(filter)),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          documents = Seq(ProtoMGODocument(marshal(replacement)))
        ))

      case Update(filter, update, options, onlyOne) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_UPDATE,
          bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          only = Some(onlyOne)
        ))


      case DropCollection(coll) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = coll,
          commandType = MGO_ADMIN_DROPCOLLECTION
        ))

      case CreateCollection(coll, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = coll,
          commandType = MGO_ADMIN_CREATECOLLECTION,
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

      case ListCollection(dbName) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          commandType = MGO_ADMIN_LISTCOLLECTION
        ))

      case CreateView(viewName, viewOn, pipeline, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_CREATEVIEW,
          bsonParam = pipeline.map(bsonToProto),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          targets = Seq(viewName,viewOn)
        ))

      case CreateIndex(key, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_CREATEINDEX,
          bsonParam = Seq(bsonToProto(key)),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))


      case DropIndexByName(indexName, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_DROPINDEXBYNAME,
          targets = Seq(indexName),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

      case DropIndexByKey(key, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_DROPINDEXBYKEY,
          bsonParam = Seq(bsonToProto(key)),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))


      case DropAllIndexes(options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_DROPALLINDEXES,
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

    }
  }


}

 

版权声明:本文为tiger-xc原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/tiger-xc/p/9536859.html