scala - How to send message from Supervisor to Actor once Actor restarts? -
requirement?
- there has long running process(daemon) should run forever
- in case of exceptions, should restarted, if fails again twice, no restart efforts should taken
problem face?
- actor restarted no message sent again
what have?
main class
package com.learner.ahka.runforever import akka.actor.actorsystem import com.typesafe.config.configfactory object raceevent extends app { val config = configfactory.parsestring( """ akka.loglevel = "debug" akka.actor.debug { receive = on lifecycle = on } """) val system = actorsystem.create("race", config) val coach = system.actorof(coach.props(), "coach") coach ! getsetgo } supervisor
package com.learner.ahka.runforever import akka.actor.supervisorstrategy.{escalate, restart} import akka.actor._ import akka.event.loggingreceive import scala.concurrent.duration._ case object getsetgo object coach { def props(): props = props[coach]; } class coach() extends actor actorlogging { val runner = context.actorof(runner.props(new marathon), "runner") override def supervisorstrategy: supervisorstrategy = oneforonestrategy(maxnrofretries = 2, withintimerange = 5 seconds) { case _: runtimeexception => restart } override def receive = loggingreceive { case getsetgo => runner ! goforit } } actor
package com.learner.ahka.runforever import akka.actor.status.failure import akka.actor.{actor, actorlogging, props} import akka.event.loggingreceive import akka.pattern.pipe object runner { def props(race: race) = props(classof[runner], race) } class runner(race: race) extends actor actorlogging { import context.dispatcher override def receive: receive = loggingreceive { case goforit => race.start pipeto self case failure(throwable) => throw throwable } } actual work
package com.learner.ahka.runforever import scala.concurrent.future case object goforit trait race { def start: future[any] } class marathon extends race { import scala.concurrent.executioncontext.implicits.global override def start: future[any] = future val future = future { (i <- 1 3) { println("i marathon runner!") thread.sleep(1000) } throw new runtimeexception("marathonrunner tired") } } logs
[debug] [05/30/2015 16:03:35.696] [main] [eventstream(akka://race)] logger log1-logging$defaultlogger started [debug] [05/30/2015 16:03:35.698] [main] [eventstream(akka://race)] default loggers started [debug] [05/30/2015 16:03:35.704] [race-akka.actor.default-dispatcher-4] [akka://race/system] supervising actor[akka://race/system/deadletterlistener#-1391310385] [debug] [05/30/2015 16:03:35.706] [race-akka.actor.default-dispatcher-3] [akka://race/system/deadletterlistener] started (akka.event.deadletterlistener@191ba186) [debug] [05/30/2015 16:03:35.710] [race-akka.actor.default-dispatcher-2] [akka://race/user] supervising actor[akka://race/user/coach#-1161587711] marathon runner! [debug] [05/30/2015 16:03:35.722] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] started (com.learner.ahka.runforever.coach@66f0f319) [debug] [05/30/2015 16:03:35.722] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] started (com.learner.ahka.runforever.runner@72f67980) [debug] [05/30/2015 16:03:35.723] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] supervising actor[akka://race/user/coach/runner#755574648] [debug] [05/30/2015 16:03:35.723] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] received handled message getsetgo [debug] [05/30/2015 16:03:35.725] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] received handled message goforit marathon runner! marathon runner! [debug] [05/30/2015 16:03:38.739] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] received handled message failure(java.lang.runtimeexception: marathonrunner tired) [error] [05/30/2015 16:03:38.752] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] marathonrunner tired java.lang.runtimeexception: marathonrunner tired @ com.learner.ahka.runforever.marathon$$anonfun$1.apply(race.scala:22) @ com.learner.ahka.runforever.marathon$$anonfun$1.apply(race.scala:17) @ scala.concurrent.impl.future$promisecompletingrunnable.liftedtree1$1(future.scala:24) @ scala.concurrent.impl.future$promisecompletingrunnable.run(future.scala:24) @ scala.concurrent.impl.executioncontextimpl$adaptedforkjointask.exec(executioncontextimpl.scala:121) @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) [debug] [05/30/2015 16:03:38.753] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarting [debug] [05/30/2015 16:03:38.755] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarted update
if don't delegate future, works expected, in case of restart, when delegated future, future not executed in case of actor restart. see here
you override postrestart method send message parent notify of restart, watch new message type in parent , respond accordingly. if context.parent won't work happily purpose (i tend not rely on it), have coach actor pass it's self actor reference new constructor parameter when instantiating runner.
Comments
Post a Comment