Akka actors : 10 millions messages processed (1s / message) in 75 seconds !
In the previous POST, Akka actors versus Scala actors : control the message throughput of actors, we've seen how to control message throughput and avoid OutOfMemory problem in particular when actors doesn't have the same messages processing rate.
Now we have to find a way to find a better way to simulate our 1 second delay, because using Thread.sleep breaks actors management as it will monopolize all threads of the used executor. In fact we would like to simulate a not-blocking 1s processing time.
So we should remove the following Thread.sleep call :
The solution is straightforward as Akka comes with a scheduler system which allow you to send a message after a given delay.
Now we have to find a way to find a better way to simulate our 1 second delay, because using Thread.sleep breaks actors management as it will monopolize all threads of the used executor. In fact we would like to simulate a not-blocking 1s processing time.
So we should remove the following Thread.sleep call :
class MyMessageProcessor extends Actor {
def receive = {
case _:DoItMessage => Thread.sleep(1000)
}
}
The solution is straightforward as Akka comes with a scheduler system which allow you to send a message after a given delay.
Using scheduleOnce, we didn't monopolize system resources, no thread is used, in fact we achieve a fake processing which simulates an asynchronous response which come 1s later. In the following code, the response is even received asynchronously by a future :
class MySimulator(system:ActorSystem) extends Actor {
def receive = {
case _:DoItMessage =>
system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done")
}
}
The application configuration is the following (application.conf file) :
package dummy
import akka.actor._
import akka.util.duration._
import akka.util.Timeout
import akka.pattern.ask
import akka.dispatch.Future
sealed trait MyMessage
case class DoItMessage(cmd:String) extends MyMessage
case class DoneMessage extends MyMessage
object Dummy {
def main(args:Array[String]) {
val howmanyjob=10*1000000
import com.typesafe.config.ConfigFactory
implicit val system=ActorSystem("DummySystem",ConfigFactory.load.getConfig("dummy"))
val simu = system.actorOf(
Props(new MySimulator(system))
.withDispatcher("simu-dispatcher"),
name="simulator")
val appManager = system.actorOf(
Props(new ApplicationManager(system, howmanyjob))
.withDispatcher("simu-dispatcher"),
name="application-manager")
import akka.routing.RoundRobinRouter
val processor = system.actorOf(
Props(new MyMessageProcessor(appManager, simu))
.withDispatcher("workers-dispatcher")
.withRouter(RoundRobinRouter(10)),
name="default")
for(i <- 1 to howmanyjob) {
processor ! DoItMessage("Do the job with ID#%d now".format(i))
}
print("All jobs sent")
}
}
class MyMessageProcessor(appManager:ActorRef, simu:ActorRef) extends Actor {
def receive = {
case msg:DoItMessage =>
implicit val timeout = Timeout(5 minutes)
val receivedTime = System.currentTimeMillis()
val future = simu ? msg
future.onComplete {
case result:Either[Throwable, String] =>
assert(System.currentTimeMillis()-receivedTime >= 1000)
appManager ! DoneMessage
}
}
}
class MySimulator(system:ActorSystem) extends Actor {
def receive = {
case _:DoItMessage =>
// Fake processing, somewhere, the job is executed and we get
// the results 1s later asynchronously
system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done")
}
}
class ApplicationManager(system:ActorSystem, howmanyjob:Int) extends Actor {
val startedTime = System.currentTimeMillis()
var count=0
def receive = {
case DoneMessage =>
count+=1
if (count%(howmanyjob/20)==0) println("%d/%d processed".format(count, howmanyjob))
if (count == howmanyjob) {
val now=System.currentTimeMillis()
println("Everything processed in %d seconds".format((now-startedTime)/1000))
system.shutdown()
}
}
}
The build.sbt SBT file (To get dependencies, build, test & run) is the following :
dummy {
akka {
loglevel = WARNING
actor {
default-dispatcher {
}
}
scheduler {
tick-duration = 50ms
ticks-per-wheel = 1000
}
}
simu-dispatcher {
type = Dispatcher
mailbox-capacity = 100000
}
workers-dispatcher {
mailbox-capacity = 10000
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 0
parallelism-max = 6000
parallelism-factor = 3.0
}
}
}
Everything is processed in 75 seconds using :
name := "AkkaSandbox"
version := "0.1"
scalaVersion := "2.9.1"
libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0-RC3"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
- linux 3.2.1-gentoo-r2 64bits
- AMD Phenom(tm) II X6 1090T (6 CPU cores)
- Java HotSpot(TM) 64-Bit Server 1.6.0.31