Situations when you must not forget to use the scala blocking statement when working with futures...
Futures imply the use of an execution context, a custom one or the default one. An execution context is a mechanism that manage threads for you, map operations to be executed on them, they come with internal default limits which are most of time proportional to the number of CPU you have. This default behavior is the right one when dealing with CPU consuming operations, but with IO this is not the case. If you need to call 100 remote calls (1 seconds required for each of them) on an other system and you have only 2 cpu, your total response time will be 50 seconds, for an almost 0% cpu usage on your host... That's why the blocking statement was introduced, it allows you to mark a code section as blocking, by blocking we mean time consuming and not cpu consuming. Thanks to this statement, the execution context will be able to grow as necessary, and you'll get very low latency. On my 6 cores CPU, 50 virtual remote calls (2 seconds for each call) require only 2056ms with the blocking statement, and it requires of course 18 seconds without it (18s * 6cpu = 108s * 1cpu).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sandbox | |
object forBlog { | |
import concurrent._ | |
import concurrent.duration._ | |
import ExecutionContext.Implicits.global | |
import io.Source | |
val fetchDuration = 2 seconds | |
def fetchWithBlockingStatement(url: String) = | |
Future { | |
blocking { | |
Thread.sleep(fetchDuration.toMillis) | |
//Source.fromURL(url).getLines.toStream | |
"a response" | |
} | |
} | |
def fetchWithoutBlockingStatement(url: String) = | |
Future { | |
Thread.sleep(fetchDuration.toMillis) | |
//Source.fromURL(url).getLines.toStream | |
"a response" | |
} | |
def howlongFor[T](proc : =>T):(Long, T) = { | |
def now=System.currentTimeMillis() | |
val started = now | |
val result = proc | |
(now-started, result) | |
} | |
val (howlongWith, _) = howlongFor{ | |
val futures = List.fill(50)(fetchWithBlockingStatement("http://localhost/")) | |
Await.result(Future.sequence(futures), Duration.Inf) | |
} | |
// => 2039ms | |
val (howlongWithout, _) = howlongFor{ | |
val futures = List.fill(50)(fetchWithoutBlockingStatement("http://localhost/")) | |
Await.result(Future.sequence(futures), Duration.Inf) | |
} | |
// => 18004ms !! | |
} |