分布式程序运算是一种水平扩展(scale-out)运算模式,其核心思想是能够充分利用服务器集群中每个服务器节点的计算资源,包括:CPU、内存、硬盘、IO总线等。首先对计算任务进行分割,然后把细分的任务分派给各节点去运算。细分的任务相互之间可以有关联或者各自为独立运算,使用akka-cluster可以把任务按照各节点运算资源的负载情况进行均匀的分配,从而达到资源的合理充分利用以实现运算效率最大化的目的。如果一项工作可以被分割成多个独立的运算任务,那么我们只需要关注如何合理地对细分任务进行分配以实现集群节点的负载均衡,这实际上是一种对无需维护内部状态的运算任务的分配方式:fire and forget。由于承担运算任务的目标actor具体的部署位置是由算法决定的,所以我们一般不需要控制指定的actor或者读取它的内部状态。当然,如果需要的话我们还是可以通过嵌入消息的方式来实现这样的功能。

  集群节点负载均衡是一种任务中央分配方式,其实是在集群环境下的router/routees运算模式,只是现在的router可以把任务发送给跨服务器上的actor。当然,任务分派是通过算法实现的,包括所有普通router的routing算法如:round-robin, random等等。 akka提供了一种基于节点运算资源负载的算法,在配置文件中定义:

akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]

下面的例子可以提供metrics基本作用的解释:

akka.actor.deployment {
  /frontend/dispatcher = {
    # Router type provided by metrics extension.
    router = cluster-metrics-adaptive-group
    # Router parameter specific for metrics extension.
    # metrics-selector = heap
    # metrics-selector = load
    # metrics-selector = cpu
    metrics-selector = mix
    #
    routees.paths = ["/user/backend"]
    cluster {
      enabled = on
      use-role = backend
      allow-local-routees = off
    }
  }
}

dispatcher代表router, backend/目录下的actor代表routees。

假如我们把一个大型的数据处理程序分割成多个独立的数据库操作。为了保证每项操作都能在任何情况下安全进行,包括出现异常,我们可以用BackoffSupervisor来支持负责操作的actor,如下:

val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(     // Backoff.OnStop
      childProps = workerProps(client),
      childName = "worker",
      minBackoff = 1 second,
      maxBackoff = 10 seconds,
      randomFactor = 0.20
    ).withAutoReset(resetBackoff = 5 seconds)
      .withSupervisorStrategy(
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
          decider.orElse(SupervisorStrategy.defaultDecider)
        )
      )
)

在这里要特别注明一下Backoff.OnFailure和Backoff.OnStop的使用场景和作用,这部分与官方文档有些出入。首先,这两种方法都不会造成childActor的重启动作(restart),而是重新创建并启动一个新的实例。具体情况请参考下面测试程序的输出:

package my.akka

import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props}
import akka.pattern.{Backoff, BackoffSupervisor, ask}

import scala.concurrent.Await
import scala.concurrent.duration._


class Child extends Actor {
  println(s"[Child]: created.         (path = ${this.self.path}, instance = ${this})")

  override def preStart(): Unit = {
    println(s"[Child]: preStart called. (path = ${this.self.path}, instance = ${this})")
    super.preStart()
  }

  override def postStop(): Unit = {
    println(s"[Child]: postStop called. (path = ${this.self.path}, instance = ${this})")
    super.postStop()
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println(s"[Child]: preRestart called with ($reason, $message). (path = ${this.self.path}, instance = ${this})")
    super.preRestart(reason, message)
  }

  override def postRestart(reason: Throwable): Unit = {
    println(s"[Child]: postRestart called with ($reason). (path = ${this.self.path}, instance = ${this})")
    super.postRestart(reason)
  }

  def receive = {
    case "boom" =>
      throw new Exception("kaboom")
    case "get ref" =>
      sender() ! self
    case a: Any =>
      println(s"[Child]: received ${a}")
  }
}

object Child {
  def props: Props
  = Props(new Child)

  def backOffOnFailureProps: Props
  = BackoffSupervisor.props(
    Backoff.onFailure(
      Child.props,
      childName = "myEcho",
      minBackoff = 1.seconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
    ))

  def backOffOnStopProps: Props
  = BackoffSupervisor.props(
    Backoff.onStop(
      Child.props,
      childName = "myEcho",
      minBackoff = 1.seconds,
      maxBackoff = 10.seconds,
      randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
    ))
}

object BackoffSuperVisorApp {
  def defaultSuperVisorCase(): Unit = {
    println(
      """
        |default ---------------------------
      """.stripMargin)

    val system = ActorSystem("app")
    try{
      /**
        * Let's see if "hello" message is received by the child
        */
      val child = system.actorOf(Child.props, "child")
      Thread.sleep(100)
      child ! "hello"
      //[Child]: received hello

      /**
        * Now restart the child with an exception within its receive method
        * and see if the `child` ActorRef is still valid (i.e. ActorRef incarnation remains same)
        */
      child ! "boom"
      Thread.sleep(200)

      child ! "hello after normal exception"
      //[Child]: received hello after normal exception

      /**
        * PoisonPill causes the child actor to `Stop`, different from restart.
        * The ActorRef incarnation gets updated.
        */
      child ! PoisonPill
      Thread.sleep(200)

      /**
        * This causes delivery to deadLetter, since the "incarnation" of ActorRef `child` became obsolete
        * after child is "Stopped"
        *
        * An incarnation is tied to an ActorRef (NOT to its internal actor instance)
        * and the same incarnation means "you can keep using the same ActorRef"
        */
      child ! "hello after PoisonPill"
      // [akka://app/user/parent/child-1] Message [java.lang.String] without sender to Actor[akka://app/user/child#-767539042]
      //   was not delivered. [1] dead letters encountered.

      Thread.sleep(200)
    }
    finally{
      system.terminate()
      Thread.sleep(500)
    }
  }

  def backOffOnStopCase(): Unit ={
    println(
      """
        |backoff onStop ---------------------------
      """.stripMargin)

    val system = ActorSystem("app")
    try{
      /**
        * Let's see if "hello" message is forwarded to the child
        * by the backoff supervisor onStop
        */
      implicit val futureTimeout: akka.util.Timeout = 1.second
      val backoffSupervisorActor = system.actorOf(Child.backOffOnStopProps, "child")
      Thread.sleep(100)

      backoffSupervisorActor ! "hello to backoff supervisor" //forwarded to child
      //[Child]: received hello to backoff supervisor

      /**
        * Now "Restart" the child with an exception from its receive method.
        * As with the default supervisory strategy, the `child` ActorRef remains valid. (i.e. incarnation kept same)
        */
      val child = Await.result(backoffSupervisorActor ? "get ref", 1.second).asInstanceOf[ActorRef]
      child ! "boom"
      Thread.sleep(2000)

      child ! "hello to child after normal exception"
      //[Child]: received hello to child after normal exception

      /**
        * Backoff Supervisor can still forward the message
        */
      backoffSupervisorActor ! "hello to backoffSupervisorActor after normal exception"
      //[Child]: received hello to backoffSupervisorActor after normal exception

      Thread.sleep(200)

      /**
        * PoisonPill causes the child actor to `Stop`, different from restart.
        * The `child` ActorRef incarnation gets updated.
        */
      child ! PoisonPill
      Thread.sleep(2000)

      child ! "hello to child ref after PoisonPill"
      //delivered to deadLetters

      /**
        * Backoff Supervisor can forward the message to its child with the new incarnation
        */
      backoffSupervisorActor ! "hello to backoffSupervisorActor after PoisonPill"
      //[Child]: received hello to backoffSupervisorActor after PoisonPill

      Thread.sleep(200)
    }
    finally{
      system.terminate()
      Thread.sleep(500)
    }
  }

  def backOffOnFailureCase(): Unit ={
    println(
      """
        |backoff onFailure ---------------------------
      """.stripMargin)

    val system = ActorSystem("app")
    try{
      /**
        * Let's see if "hello" message is forwarded to the child
        * by the backoff supervisor onFailure
        */
      implicit val futureTimeout: akka.util.Timeout = 1.second
      val backoffSupervisorActor = system.actorOf(Child.backOffOnFailureProps, "child")
      Thread.sleep(100)

      backoffSupervisorActor ! "hello to backoff supervisor" //forwarded to child
      //[Child]: received hello to backoff supervisor

      /**
        * Now "Stop" the child with an exception from its receive method.
        * You'll see the difference between "Restart" and "Stop" from here:
        */
      val child = Await.result(backoffSupervisorActor ? "get ref", 1.second).asInstanceOf[ActorRef]
      child ! "boom"
      Thread.sleep(2000)

      /**
        * Note that this is after normal exception, not after PoisonPill,
        * but child is completely "Stopped" and its ActorRef "incarnation" became obsolete
        *
        * So, the message to the `child` ActorRef is delivered to deadLetters
        */
      child ! "hello to child after normal exception"
      //causes delivery to deadLetter

      /**
        * Backoff Supervisor can still forward the message to the new child ActorRef incarnation
        */
      backoffSupervisorActor ! "hello to backoffSupervisorActor after normal exception"
      //[Child]: received hello to backoffSupervisorActor after normal exception

      /**
        * You can get a new ActorRef which represents the new incarnation
        */
      val newChildRef = Await.result(backoffSupervisorActor ? "get ref", 1.second).asInstanceOf[ActorRef]
      newChildRef ! "hello to new child ref after normal exception"
      //[Child]: received hello to new child ref after normal exception

      Thread.sleep(200)

      /**
        * No matter whether the supervisory strategy is default or backoff,
        * PoisonPill causes the actor to "Stop", not "Restart"
        */
      newChildRef ! PoisonPill
      Thread.sleep(3000)

      newChildRef ! "hello to new child ref after PoisonPill"
      //delivered to deadLetters

      Thread.sleep(200)
    }
    finally{
      system.terminate()
      Thread.sleep(500)
    }
  }

  def main(args: Array[String]): Unit ={
    defaultSuperVisorCase()
    backOffOnStopCase()
    backOffOnFailureCase()
  }
}

OnStop:不响应child-actor发生的异常,采用SupervisorStrategy异常处理方式。对正常停止动作,如PoisonPill, context.stop作用:重新构建新的实例并启动。

OnFailure:不响应child-actor正常停止,任其终止。发生异常时重新构建新的实例并启动。

很明显,通常我们需要在运算发生异常时重新启动运算,所以用OnFailure才是正确的选择。

下面是我之前介绍关于BackoffSupervisor时用的一个例子的代码示范:

package backoffSupervisorDemo
import akka.actor._
import akka.pattern._
import backoffSupervisorDemo.InnerChild.TestMessage

import scala.concurrent.duration._

object InnerChild {
  case class TestMessage(msg: String)
  class ChildException extends Exception

  def props = Props[InnerChild]
}
class InnerChild extends Actor with ActorLogging {
  import InnerChild._
  override def receive: Receive = {
    case TestMessage(msg) => //模拟子级功能
      log.info(s"Child received message: ${msg}")
  }
}
object Supervisor {
  def props: Props = { //在这里定义了监管策略和child Actor构建
    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
      case _: InnerChild.ChildException => SupervisorStrategy.Restart
    }

    val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0)
      .withManualReset
      .withSupervisorStrategy(
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
          decider.orElse(SupervisorStrategy.defaultDecider)
        )
      )
    BackoffSupervisor.props(options)
  }
}
//注意:下面是Supervisor的父级,不是InnerChild的父级
object ParentalActor {
  case class SendToSupervisor(msg: InnerChild.TestMessage)
  case class SendToInnerChild(msg: InnerChild.TestMessage)
  case class SendToChildSelection(msg: InnerChild.TestMessage)
  def props = Props[ParentalActor]
}
class ParentalActor extends Actor with ActorLogging {
  import ParentalActor._
  //在这里构建子级Actor supervisor
  val supervisor = context.actorOf(Supervisor.props,"supervisor")
  supervisor ! BackoffSupervisor.getCurrentChild //要求supervisor返回当前子级Actor
  var innerChild: Option[ActorRef] = None   //返回的当前子级ActorRef
  val selectedChild = context.actorSelection("/user/parent/supervisor/innerChild")
  override def receive: Receive = {
    case BackoffSupervisor.CurrentChild(ref) =>   //收到子级Actor信息
      innerChild = ref
    case SendToSupervisor(msg) => supervisor ! msg
    case SendToChildSelection(msg) => selectedChild ! msg
    case SendToInnerChild(msg) => innerChild foreach(child => child ! msg)
  }

}
object BackoffSupervisorDemo extends App {
  import ParentalActor._
  val testSystem = ActorSystem("testSystem")
  val parent = testSystem.actorOf(ParentalActor.props,"parent")

  Thread.sleep(1000)   //wait for BackoffSupervisor.CurrentChild(ref) received

  parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor"))
  parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild"))
  parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild"))


  scala.io.StdIn.readLine()

  testSystem.terminate()

}

好了,现在我们就开始实现一个在集群中进行数据库操作的例子,看看akka-cluster是如何把一串操作分派给各节点上去操作的。首先是这个Worker:

import akka.actor._
import scala.concurrent.duration._

object Backend {
  case class SaveFormula(op1: Int, op2: Int)
  def workerProps = Props(new Worker)
}

class Worker extends Actor with ActorLogging {
  import Backend._
  
  context.setReceiveTimeout(500 milliseconds)
  
  override def receive: Receive = {
    case SaveFormula(op1,op2) => {
      val res = op1 * op2
      // saveToDB(op1,op2,res)
      log.info(s"******* $op1 X $op2 = $res save to DB by $self *******")
    }
    case ReceiveTimeout =>
      log.info(s"******* $self receive timout! *******")
      throw new RuntimeException("Worker idle timeout!")
  }
}

这应该是一个最普通的actor了。我们把它放在一个BackoffSupervisor下面:

  def superProps: Props = {
    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
      case _: DBException => SupervisorStrategy.Restart
    }

    val options = Backoff.onFailure(
      childProps = workerProps,
      childName = "worker",
      minBackoff = 1 second,
      maxBackoff = 5 seconds,
      randomFactor = 0.20
    ).withAutoReset(resetBackoff = 10 seconds)
      .withSupervisorStrategy(
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
          decider.orElse(SupervisorStrategy.defaultDecider)
        )
      )

    BackoffSupervisor.props(options)
  }
  
  def create(port: Int): Unit = {
      val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
        .withFallback(ConfigFactory.parseString(s"akka.cluster.roles=[backend]"))
        .withFallback(ConfigFactory.load())

      val system = ActorSystem("ClusterSystem", config)

      val Backend = system.actorOf(superProps,"backend")

  }

下面是负责分配任务的router,或者前端frontend的定义: 

import akka.actor._
import akka.routing._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.util._
import akka.cluster._

object Frontend {
  private var _frontend: ActorRef = _

  case class Multiply(op1: Int, op2: Int)
  def create(port: Int) = {

    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.posrt=$port")
      .withFallback(ConfigFactory.parseString(s"akka.cluster.roles=[frontend]"))
      .withFallback(ConfigFactory.load())
    val system = ActorSystem("ClusterSystem",config)

    Cluster(system).registerOnMemberUp{
      _frontend = system.actorOf(Props[Frontend],"frontend")
    }
    
    system.actorOf(Props[Frontend],"frontend")


  }
  def getFrontend = _frontend
}

class Frontend extends Actor with ActorLogging {
  import Frontend._
  import Backend._
  import context.dispatcher

  //just lookup routees, routing strategy is responsible for deployment
  val backend = context.actorOf(FromConfig.props(/* Props.empty */),"dispatcher")

  context.system.scheduler.schedule(3.seconds, 3.seconds, self,
    Multiply(Random.nextInt(100), Random.nextInt(100)))

  override def receive: Receive = {
    case Multiply(op1,op2) =>
      backend ! SaveFormula(op1,op2)
    case msg @ _ =>
      log.info(s"******* unrecognized message: $msg! ******")
  }
}

我们需要在Frontend里构建Backend。但是,Backend actor 即routees ,我们已经在Backend构建时进行了部署,所以在这里只需要用FromConfig.props(Props.empty)能lookup routees就可以了,不需要重复部署。

下面是具体的数据库存储操作示范:

  def superProps: Props = {
    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
      case _: DBException => SupervisorStrategy.Restart
    }
    val clientSettings: MongoClientSettings = MongoClientSettings.builder()
      .applyToClusterSettings {b =>
        b.hosts(List(new ServerAddress("localhost:27017")).asJava)
      }.build()

    val client: MongoClient = MongoClient(clientSettings)

    val options = Backoff.onFailure(
      childProps = workerProps(client),
      childName = "worker",
      minBackoff = 1 second,
      maxBackoff = 10 seconds,
      randomFactor = 0.20
    ).withAutoReset(resetBackoff = 5 seconds)
      .withSupervisorStrategy(
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
          decider.orElse(SupervisorStrategy.defaultDecider)
        )
      )

    BackoffSupervisor.props(options)
  }

注意,我们是在superProps里做数据库的连接的。这样Backend在实例化或者因为某种原因重启的话,特别是换了另一个JVM时可以正确的构建MongoClient。数据库操作是标准的MongoEngine方式:

 import monix.execution.Scheduler.Implicits.global
  implicit val mongoClient = client;
  val ctx = MGOContext("testdb","mulrecs")

  def saveToDB(op1: Int, op2: Int, by: String) = {
      val doc = Document("by" -> by, "op1" -> op1, "op2" -> op2, "res" -> op1 * op2)
      val cmd = ctx.setCommand(MGOCommands.Insert(Seq(doc)))
      val task = mgoUpdate[Completed](cmd).toTask
      task.runOnComplete {
        case Success(s) => log.info("operations completed successfully.")
        case Failure(exception) => log.error(s"error: ${exception.getMessage}")
    }
  }
 

数据库操作是在另一个ExecutionContext里进行的。

下面是本次示范的完整源代码:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")

libraryDependencies ++= Seq(
  "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4"
)

build.sbt

import scalapb.compiler.Version.scalapbVersion
import scalapb.compiler.Version.grpcJavaVersion

name := "cluster-load-balance"

version := "0.1"

scalaVersion := "2.12.8"

scalacOptions += "-Ypartial-unification"

libraryDependencies ++= {
  val akkaVersion = "2.5.19"
  Seq(
    "com.typesafe.akka"       %%  "akka-actor"   % akkaVersion,
    "com.typesafe.akka"       %%  "akka-cluster"   % akkaVersion,
    "com.typesafe.akka" %% "akka-cluster-metrics" % akkaVersion,
    "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
    "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
    //for mongodb 4.0
    "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.0",
    "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.20",
    //other dependencies
    "co.fs2" %% "fs2-core" % "0.9.7",
    "ch.qos.logback"  %  "logback-classic"   % "1.2.3",
    "org.typelevel" %% "cats-core" % "0.9.0",
    "io.monix" %% "monix-execution" % "3.0.0-RC1",
    "io.monix" %% "monix-eval" % "3.0.0-RC1"
  )
}

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

resources/application.conf

akka {
  actor {
    provider = "cluster"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    # auto-down-unreachable-after = 10s
  }
}

akka.cluster.min-nr-of-members = 3


akka.cluster.role {
  frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}

akka.actor.deployment {
  /frontend/dispatcher = {
    # Router type provided by metrics extension.
    router = cluster-metrics-adaptive-group
    # Router parameter specific for metrics extension.
    # metrics-selector = heap
    # metrics-selector = load
    # metrics-selector = cpu
    metrics-selector = mix
    #
    routees.paths = ["/user/backend"]
    cluster {
      enabled = on
      use-role = backend
      allow-local-routees = off
    }
  }
}

protobuf/sdp.proto

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
  // use a custom Scala package name
  // package_name: "io.ontherocks.introgrpc.demo"

  // don't append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: "io.ontherocks.hellogrpc.RockingMessage"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: "sealed trait SomeSealedTrait"
};

package sdp.grpc.services;


message ProtoDate {
  int32 yyyy = 1;
  int32 mm   = 2;
  int32 dd   = 3;
}

message ProtoTime {
  int32 hh   = 1;
  int32 mm   = 2;
  int32 ss   = 3;
  int32 nnn  = 4;
}

message ProtoDateTime {
   ProtoDate date = 1;
   ProtoTime time = 2;
}

message ProtoAny {
  bytes value = 1;
}

protobuf/mgo.proto

 

版权声明:本文为tiger-xc原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/tiger-xc/p/10212830.html