Primes computation : From home made akka actors based solution to akka-stream
Available on github, Primes is a library dedicated to primes number computation, it is fully generic and can work with various numeric types (Int, Long, BigInt for example). This library generates CheckedValue instances which contains the just tested numeric value, it tells you if it is prime number or not, gives you its position and the number of digit it contains. The fact that it computes the prime position, or not prime position, implies that the computation starts from the beginning or from a previous persisted state (previous highest prime CheckedValue and not prime CheckedValue).
The full source code of this old implementation is available here.
Akka-stream brings us a great abstraction, for them, and the result becomes quite easier to read and to understand. back-pressure is automatically taken into account, parallel processing and ordering are just done through mapAsync call instead of just a map call.
The full source code of this new implementation is available here.
Now let's give place to the code, and just compare NEW versus OLD code :
Check this to see how to use it, for example directly from the scala console (sbt console).
In January 2014,
I spent some times writing an actor based primes computation algorithm, it was not a very easy task because it implies to manage myself many things :- back pressure management,
- parallelism,
- reordering results.
The full source code of this old implementation is available here.
Now, one year later,
it was time to look at something else, back pressure, results ordering, parallelism, ... are too common patterns with actors that we shouldn't have to deal with them directly, it is up to the framework to provide us with the right solution.Akka-stream brings us a great abstraction, for them, and the result becomes quite easier to read and to understand. back-pressure is automatically taken into account, parallel processing and ordering are just done through mapAsync call instead of just a map call.
The full source code of this new implementation is available here.
Now let's give place to the code, and just compare NEW versus OLD code :
NEW : Akka-stream actors primes computation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class StreamBasedPrimesGenerator[NUM]( | |
handler: CheckedValue[NUM] => Unit = StreamBasedPrimesGenerator.defaultHandlerFactory[NUM](), | |
name: String = "DefaultStreamBasedPrimesGeneratorSystem", | |
startFrom: NUM = 2, | |
primeNth: NUM = 1, | |
notPrimeNth: NUM = 1)(implicit numops: Integral[NUM]) extends PrimesDefinitions[NUM] with Shutdownable { | |
import numops._ | |
implicit val system = ActorSystem(name) | |
import system.dispatcher | |
implicit val materializer = FlowMaterializer() | |
case class TestedValue(value:NUM) { | |
val state = isPrime(value) | |
val digitCount = value.toString.size | |
} | |
val materialized = FlowGraph { implicit builder => | |
import FlowGraphImplicits._ | |
val valueIterator = new NumericIterator(startFrom) | |
val values: Source[NUM] = Source(() => valueIterator) | |
val isPrimeNthIterator = new NumericIterator(primeNth) | |
val isPrimeNth = Source(() => isPrimeNthIterator) | |
val isNotPrimeNthIterator = new NumericIterator(notPrimeNth) | |
val isNotPrimeNth = Source(() => isNotPrimeNthIterator) | |
val testedValues = Flow[NUM].mapAsync(x=> Future{TestedValue(x) }) | |
val checkedValues = Flow[(NUM, TestedValue)].map { | |
case (nth, tv) => CheckedValue[NUM](tv.value, tv.state, tv.digitCount, nth) | |
} | |
val onlyPrimes = Flow[TestedValue].filter(_.state) | |
val onlyNotPrimes = Flow[TestedValue].filter(! _.state) | |
val out = ForeachSink[CheckedValue[NUM]](handler) | |
val cast = Broadcast[TestedValue] | |
val merge = Merge[CheckedValue[NUM]] | |
val zipPrimeNth = Zip[NUM, TestedValue] | |
val zipNotPrimeNth= Zip[NUM, TestedValue] | |
isPrimeNth ~> zipPrimeNth.left | |
isNotPrimeNth ~> zipNotPrimeNth.left | |
values ~> testedValues ~> cast ~> onlyPrimes ~> zipPrimeNth.right | |
cast ~> onlyNotPrimes ~> zipNotPrimeNth.right | |
zipPrimeNth.out ~> checkedValues ~> merge | |
zipNotPrimeNth.out ~> checkedValues ~> merge | |
merge ~> out | |
}.run() | |
override def shutdown() { system.shutdown()} | |
} |
OLD : Akka actors primes computation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ActorsPrimesGenerator[NUM]( | |
handler: CheckedValue[NUM] => Unit = ActorsPrimesGenerator.mkPrintHandler[NUM](), | |
name: String = "DefaultActordBasedPrimesGeneratorSystem", | |
startFrom: NUM = 2, | |
primeNth: NUM = 1, | |
notPrimeNth: NUM = 1)(implicit numops: Integral[NUM]) extends PrimesDefinitions[NUM] with Shutdownable { | |
import numops._ | |
/* | |
* To minimize the amount of messages used for back pressure regulation | |
*/ | |
val groupedAckSize = 100L | |
/* | |
* How many values to keep precomputed but not yet sent to the receiver | |
*/ | |
val precomputedCount = 150L | |
val config = ConfigFactory.load() | |
implicit val system = ActorSystem(name, config.getConfig("primes").withFallback(config)) | |
case class NextValueToCompute(value: NUM) | |
case class PartialResult(value: NUM, isPrime: Boolean, digitCount:Long) | |
object CheckerActor { | |
def props() = Props(new CheckerActor) | |
} | |
class CheckerActor extends Actor { | |
def receive = { | |
case NextValueToCompute(value) => | |
sender ! PartialResult(value, isPrime(value), value.toString.size) | |
} | |
} | |
case class CheckedValueAckMessage(count: Long) | |
class ValuesManagerActor( | |
forActor: ActorRef, | |
checkerWorkers: Int = Runtime.getRuntime.availableProcessors) extends Actor { | |
val checkerRouter = context.actorOf( | |
CheckerActor.props.withRouter(SmallestMailboxPool(checkerWorkers)), | |
"CheckerActorRouter") | |
private var nextPrimeNth = primeNth | |
private var nextNotPrimeNth = notPrimeNth | |
private var currentValue = startFrom // waiting the result for this value | |
private var nextValue = startFrom // next value to send to checker worker | |
// buffered partial results, because we need ordering to compute primes & not primes position | |
private var waitBuffer = Map.empty[NUM, PartialResult] | |
private var inpg = 0L | |
private var sentAckDelta = 0L | |
private final def processPartialResult(partial: PartialResult) { | |
currentValue += one | |
val nth = if (partial.isPrime) nextPrimeNth else nextNotPrimeNth | |
if (partial.isPrime) nextPrimeNth += one else nextNotPrimeNth += one | |
val newResult = CheckedValue[NUM](partial.value, partial.isPrime, partial.digitCount, nth) | |
forActor ! newResult | |
sentAckDelta += 1L | |
} | |
private final def flush2order() { | |
while (waitBuffer.size > 0 && waitBuffer.contains(currentValue)) { | |
val pr = waitBuffer(currentValue) | |
processPartialResult(pr) | |
waitBuffer = waitBuffer - pr.value | |
} | |
} | |
private final def prepareNexts() { | |
//if (sentAckDelta <= groupedAckSize) { | |
if (inpg < precomputedCount/2) { | |
for { _ <- inpg to precomputedCount } { | |
checkerRouter ! NextValueToCompute(nextValue) | |
nextValue += one | |
inpg += 1 | |
} | |
} | |
} | |
prepareNexts() | |
def receive = { | |
case pr: PartialResult if pr.value == currentValue => | |
inpg -= 1 | |
processPartialResult(pr) | |
flush2order() | |
prepareNexts() | |
case pr: PartialResult => // Then delay | |
inpg -= 1 | |
waitBuffer += pr.value -> pr | |
prepareNexts() | |
case CheckedValueAckMessage(count) => | |
sentAckDelta -= count | |
prepareNexts() | |
} | |
} | |
class DealerActor[NUM](handler: CheckedValue[NUM] => Unit) extends Actor { | |
var valuesCounter: Long = 0l | |
def receive = { | |
case chk: CheckedValue[NUM] => | |
handler(chk) | |
valuesCounter += 1 | |
if (valuesCounter % groupedAckSize == 0L) sender ! CheckedValueAckMessage(groupedAckSize) | |
} | |
} | |
private val dealer = actor("DealerPrinter") { | |
new DealerActor(handler) | |
} | |
private val manager = actor("ValuesManagerActor") { | |
new ValuesManagerActor(dealer) | |
} | |
override def shutdown() { | |
system.shutdown() | |
} | |
} |
From performance point of view
Some "naive" raw performance results ("sbt test" to try yourself) :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[info] PerfPrimesTest: | |
[info] - Performance classic tests - BigInt | |
[info] + duration for 10000 : 834ms lastPrime=104743 | |
[info] + duration for 25000 : 3228ms lastPrime=287137 | |
[info] + duration for 50000 : 8234ms lastPrime=611957 | |
[info] - Performance parallel tests - BigInt | |
[info] + duration for 10000 : 320ms lastPrime=104743 | |
[info] + duration for 25000 : 1188ms lastPrime=287137 | |
[info] + duration for 50000 : 3242ms lastPrime=611957 | |
[info] PerfActorPrimesTest: | |
[info] - akka actors based computation test - BigInt | |
[info] + duration for 10000 : 608ms lastPrime=104743 | |
[info] + duration for 25000 : 1584ms lastPrime=287137 | |
[info] + duration for 50000 : 3679ms lastPrime=611957 | |
[info] PerfActorUnorderedPrimesTest: | |
[info] - akka actors (unordered) based computation test - BigInt | |
[info] + results are unsorted | |
[info] + duration for 10000 : 593ms lastPrime=104707 | |
[info] + duration for 25000 : 1467ms lastPrime=287117 | |
[info] + duration for 50000 : 3488ms lastPrime=612061 | |
[info] PerfActorUnorderedUnsafePrimesTest: | |
[info] - akka actors (unordered and unsafe) based computation test - BigInt | |
[info] + results are unsorted | |
[info] + no back pressure management in this implemntation | |
[info] + duration for 10000 : 477ms lastPrime=104723 | |
[info] + duration for 25000 : 1385ms lastPrime=287141 | |
[info] + duration for 50000 : 3447ms lastPrime=611969 | |
[info] PerfAkkaStreamPrimesTest: | |
[info] - akka actors streams based computation test - BigInt | |
[info] + duration for 10000 : 905ms lastPrime=104743 | |
[info] + duration for 25000 : 2181ms lastPrime=287137 | |
[info] + duration for 50000 : 4785ms lastPrime=611957 | |
[info] + The order is preserved, with back-pressure control and the whole without any effort ! |