illuxic illuxic - 1 month ago 16
Scala Question

Asynchronous IO (socket) in Scala

import java.nio.channels.{AsynchronousServerSocketChannel, AsynchronousSocketChannel}
import java.net.InetSocketAddress
import scala.concurrent.{Future, blocking}

class Master {
val server: AsynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()
server.bind(new InetSocketAddress("localhost", port))

val client: Future[AsynchronousSocketChannel] = Future { blocking { server.accept().get() } }
}


This is a pseudo code what I'm trying.

Before asking this question, I searched about it and found a related answer.
In her answer, I wonder what this means:
"If you want to absolutely prevent additional threads from ever being created, then you ought to use an AsyncIO library, such as Java's NIO library."


Since I don't want to suffer from either
running out of memory
(case when using
blocking
) or
thread pool hell
(opposite case) , her answer was exactly what I have been looking forward. However, as you can see in my pseudo code, a new thread will be created for each client (I made just one client for the sake of simplicity) due to
blocking
even though I used NIO as she said.


  1. Please explain her suggestion with a simple example.

  2. Is my pseudo code an appropriate approach when trying to asynchronous io in Scala or is there a better alternative way?


Answer

Answer to Question 1

She suggests two things

a) If you are using future with blocking call then use scala.concurrent.bocking. blocking tells the default execution context to spawn temporary threads to stop starvation.

lets say blockingServe() does blocking. To do multiple blockingServes we use Futures.

Future {
  blockingServe() //blockingServe() could be serverSocket.accept()
}

But above code leads to starvation in evented model. In order to deal with starvation. We have to ask execution context to create new temporary threads to serve extra requests. This is communicated to execution context using scala.concurrent.blocking

Future {
  blocking {
    blockingServe() //blockingServe() could be serverSocket.accept()
  }
}

b)

We have not achieved non-blocking still. Code is still blocking but blocking in different thread (asynchronous)

How can we achieve true non blocking ?

We can achieve true non-blocking using non-blocking api.

So, in your case you have to use java.nio.channels.ServerSocketChannel to go for true non blocking model.

Notice in your code snippet you have mixed a) and b) which not required

Answer to Question 2

 val selector = Selector.open()
 val serverChannel = ServerSocketChannel.open()
 serverChannel.configureBlocking(false)
 serverChannel.socket().bind(new InetSocketAddress("192.168.2.1", 5000))
 serverChannel.register(selector, SelectionKey.OP_ACCEPT)
 def assignWork[A](serverChannel: ServerSocketChannel, selector: Selector, work:  => Future[A]) = {

   work
   //recurse
 }
 assignWork[Unit](serverChannel, selector, Future(()))
}