Available on github, Primes is a library dedicated to primes number computation, it is fully generic and can work with various numeric types (Int, Long, BigInt for example). This library generates CheckedValue instances which contains the just tested numeric value, it tells you if it is prime number or not, gives you its position and the number of digit it contains. The fact that it computes the prime position, or not prime position, implies that the computation starts from the beginning or from a previous persisted state (previous highest prime CheckedValue and not prime CheckedValue).

In January 2014,

I spent some times writing an actor based primes computation algorithm, it was not a very easy task because it implies to manage myself many things :
  • back pressure management,
  • parallelism,
  • reordering results.

The full source code of this old implementation is available here.

Now, one year later,

it was time to look at something else, back pressure, results ordering, parallelism, ... are too common patterns with actors that we shouldn't have to deal with them directly, it is up to the framework to provide us with the right solution.

Akka-stream brings us a great abstraction, for them, and the result becomes quite easier to read and to understand. back-pressure is automatically taken into account, parallel processing and ordering are just done through mapAsync call instead of just a map call.

The full source code of this new implementation is available here.

Now let's give place to the code, and just compare NEW versus OLD code :

NEW : Akka-stream actors primes computation

class StreamBasedPrimesGenerator[NUM](
handler: CheckedValue[NUM] => Unit = StreamBasedPrimesGenerator.defaultHandlerFactory[NUM](),
name: String = "DefaultStreamBasedPrimesGeneratorSystem",
startFrom: NUM = 2,
primeNth: NUM = 1,
notPrimeNth: NUM = 1)(implicit numops: Integral[NUM]) extends PrimesDefinitions[NUM] with Shutdownable {
import numops._
implicit val system = ActorSystem(name)
import system.dispatcher
implicit val materializer = FlowMaterializer()
case class TestedValue(value:NUM) {
val state = isPrime(value)
val digitCount = value.toString.size
}
val materialized = FlowGraph { implicit builder =>
import FlowGraphImplicits._
val valueIterator = new NumericIterator(startFrom)
val values: Source[NUM] = Source(() => valueIterator)
val isPrimeNthIterator = new NumericIterator(primeNth)
val isPrimeNth = Source(() => isPrimeNthIterator)
val isNotPrimeNthIterator = new NumericIterator(notPrimeNth)
val isNotPrimeNth = Source(() => isNotPrimeNthIterator)
val testedValues = Flow[NUM].mapAsync(x=> Future{TestedValue(x) })
val checkedValues = Flow[(NUM, TestedValue)].map {
case (nth, tv) => CheckedValue[NUM](tv.value, tv.state, tv.digitCount, nth)
}
val onlyPrimes = Flow[TestedValue].filter(_.state)
val onlyNotPrimes = Flow[TestedValue].filter(! _.state)
val out = ForeachSink[CheckedValue[NUM]](handler)
val cast = Broadcast[TestedValue]
val merge = Merge[CheckedValue[NUM]]
val zipPrimeNth = Zip[NUM, TestedValue]
val zipNotPrimeNth= Zip[NUM, TestedValue]
isPrimeNth ~> zipPrimeNth.left
isNotPrimeNth ~> zipNotPrimeNth.left
values ~> testedValues ~> cast ~> onlyPrimes ~> zipPrimeNth.right
cast ~> onlyNotPrimes ~> zipNotPrimeNth.right
zipPrimeNth.out ~> checkedValues ~> merge
zipNotPrimeNth.out ~> checkedValues ~> merge
merge ~> out
}.run()
override def shutdown() { system.shutdown()}
}
view raw gistfile1.scala hosted with ❤ by GitHub
Check this to see how to use it, for example directly from the scala console (sbt console).

OLD : Akka actors primes computation

class ActorsPrimesGenerator[NUM](
handler: CheckedValue[NUM] => Unit = ActorsPrimesGenerator.mkPrintHandler[NUM](),
name: String = "DefaultActordBasedPrimesGeneratorSystem",
startFrom: NUM = 2,
primeNth: NUM = 1,
notPrimeNth: NUM = 1)(implicit numops: Integral[NUM]) extends PrimesDefinitions[NUM] with Shutdownable {
import numops._
/*
* To minimize the amount of messages used for back pressure regulation
*/
val groupedAckSize = 100L
/*
* How many values to keep precomputed but not yet sent to the receiver
*/
val precomputedCount = 150L
val config = ConfigFactory.load()
implicit val system = ActorSystem(name, config.getConfig("primes").withFallback(config))
case class NextValueToCompute(value: NUM)
case class PartialResult(value: NUM, isPrime: Boolean, digitCount:Long)
object CheckerActor {
def props() = Props(new CheckerActor)
}
class CheckerActor extends Actor {
def receive = {
case NextValueToCompute(value) =>
sender ! PartialResult(value, isPrime(value), value.toString.size)
}
}
case class CheckedValueAckMessage(count: Long)
class ValuesManagerActor(
forActor: ActorRef,
checkerWorkers: Int = Runtime.getRuntime.availableProcessors) extends Actor {
val checkerRouter = context.actorOf(
CheckerActor.props.withRouter(SmallestMailboxPool(checkerWorkers)),
"CheckerActorRouter")
private var nextPrimeNth = primeNth
private var nextNotPrimeNth = notPrimeNth
private var currentValue = startFrom // waiting the result for this value
private var nextValue = startFrom // next value to send to checker worker
// buffered partial results, because we need ordering to compute primes & not primes position
private var waitBuffer = Map.empty[NUM, PartialResult]
private var inpg = 0L
private var sentAckDelta = 0L
private final def processPartialResult(partial: PartialResult) {
currentValue += one
val nth = if (partial.isPrime) nextPrimeNth else nextNotPrimeNth
if (partial.isPrime) nextPrimeNth += one else nextNotPrimeNth += one
val newResult = CheckedValue[NUM](partial.value, partial.isPrime, partial.digitCount, nth)
forActor ! newResult
sentAckDelta += 1L
}
private final def flush2order() {
while (waitBuffer.size > 0 && waitBuffer.contains(currentValue)) {
val pr = waitBuffer(currentValue)
processPartialResult(pr)
waitBuffer = waitBuffer - pr.value
}
}
private final def prepareNexts() {
//if (sentAckDelta <= groupedAckSize) {
if (inpg < precomputedCount/2) {
for { _ <- inpg to precomputedCount } {
checkerRouter ! NextValueToCompute(nextValue)
nextValue += one
inpg += 1
}
}
}
prepareNexts()
def receive = {
case pr: PartialResult if pr.value == currentValue =>
inpg -= 1
processPartialResult(pr)
flush2order()
prepareNexts()
case pr: PartialResult => // Then delay
inpg -= 1
waitBuffer += pr.value -> pr
prepareNexts()
case CheckedValueAckMessage(count) =>
sentAckDelta -= count
prepareNexts()
}
}
class DealerActor[NUM](handler: CheckedValue[NUM] => Unit) extends Actor {
var valuesCounter: Long = 0l
def receive = {
case chk: CheckedValue[NUM] =>
handler(chk)
valuesCounter += 1
if (valuesCounter % groupedAckSize == 0L) sender ! CheckedValueAckMessage(groupedAckSize)
}
}
private val dealer = actor("DealerPrinter") {
new DealerActor(handler)
}
private val manager = actor("ValuesManagerActor") {
new ValuesManagerActor(dealer)
}
override def shutdown() {
system.shutdown()
}
}
view raw gistfile1.scala hosted with ❤ by GitHub

From performance point of view

Some "naive" raw performance results ("sbt test" to try yourself) :
[info] PerfPrimesTest:
[info] - Performance classic tests - BigInt
[info] + duration for 10000 : 834ms lastPrime=104743
[info] + duration for 25000 : 3228ms lastPrime=287137
[info] + duration for 50000 : 8234ms lastPrime=611957
[info] - Performance parallel tests - BigInt
[info] + duration for 10000 : 320ms lastPrime=104743
[info] + duration for 25000 : 1188ms lastPrime=287137
[info] + duration for 50000 : 3242ms lastPrime=611957
[info] PerfActorPrimesTest:
[info] - akka actors based computation test - BigInt
[info] + duration for 10000 : 608ms lastPrime=104743
[info] + duration for 25000 : 1584ms lastPrime=287137
[info] + duration for 50000 : 3679ms lastPrime=611957
[info] PerfActorUnorderedPrimesTest:
[info] - akka actors (unordered) based computation test - BigInt
[info] + results are unsorted
[info] + duration for 10000 : 593ms lastPrime=104707
[info] + duration for 25000 : 1467ms lastPrime=287117
[info] + duration for 50000 : 3488ms lastPrime=612061
[info] PerfActorUnorderedUnsafePrimesTest:
[info] - akka actors (unordered and unsafe) based computation test - BigInt
[info] + results are unsorted
[info] + no back pressure management in this implemntation
[info] + duration for 10000 : 477ms lastPrime=104723
[info] + duration for 25000 : 1385ms lastPrime=287141
[info] + duration for 50000 : 3447ms lastPrime=611969
[info] PerfAkkaStreamPrimesTest:
[info] - akka actors streams based computation test - BigInt
[info] + duration for 10000 : 905ms lastPrime=104743
[info] + duration for 25000 : 2181ms lastPrime=287137
[info] + duration for 50000 : 4785ms lastPrime=611957
[info] + The order is preserved, with back-pressure control and the whole without any effort !
view raw gistfile1.txt hosted with ❤ by GitHub