Ostrich Ostrich - 7 days ago 5
Scala Question

http4s, Service Executor and Mongodb: How can I wait for insertOne to finish

Apologies in advance for the basic question. I am starting to learn Scala with http4s and in a router handler, I am trying to enter an entry to MongoDB. As far as I can tell

insertOne
returns a
Observable[Completed]
.

Any idea how I can wait for the observalbe to complete, before returning the response?

My code is:

class Routes {
val service: HttpService = HttpService {
case r @ GET -> Root / "hello" => {
val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("scala")
val collection: MongoCollection[Document] = database.getCollection("tests")
val doc: Document = Document("_id" -> 0, "name" -> "MongoDB", "type" -> "database",
"count" -> 1, "info" -> Document("x" -> 203, "y" -> 102))
collection.insertOne(doc)
mongoClient.close()
Ok("Hello.")
}
}
}

class GomadApp(host: String, port: Int) {
private val pool = Executors.newCachedThreadPool()

println(s"Starting server on '$host:$port'")

val routes = new Routes().service

// Add some logging to the service
val service: HttpService = routes.local { req =>
val path = req.uri
val start = System.nanoTime()
val result = req
val time = ((System.nanoTime() - start) / 1000) / 1000.0
println(s"${req.remoteAddr.getOrElse("null")} -> ${req.method}: $path in $time ms")
result
}

// Construct the blaze pipeline.
def build(): ServerBuilder =
BlazeBuilder
.bindHttp(port, host)
.mountService(service)
.withServiceExecutor(pool)
}

object GomadApp extends ServerApp {
val ip = "127.0.0.1"
val port = envOrNone("HTTP_PORT") map (_.toInt) getOrElse (8787)

override def server(args: List[String]): Task[Server] =
new GomadApp(ip, port)
.build()
.start

}

Answer

I'd recommend https://github.com/haghard/mongo-query-streams - although you'll have to fork it and up the dependencies a bit, scalaz 7.1 and 7.2 aren't binary-compatible.

The less-streamy (and less referentially correct) way: https://github.com/Verizon/delorean

collection.insertOne(doc).toFuture().toTask.flatMap({res => Ok("Hello")})

The latter solution looks easier, but it has some hidden pitfalls. See https://www.reddit.com/r/scala/comments/3zofjl/why_is_future_totally_unusable/

This tweet made me wonder: https://twitter.com/timperrett/status/684584581048233984 Do you consider Futures "totally unusable" or is this just hyperbole? I've never had a major problem, but I'm willing to be enlightened. Doesn't the following code make Futures effectively "lazy"? def myFuture = Future { 42 } And, finally, I've also heard rumblings that scalaz's Tasks have some failings as well, but I haven't found much on it. Anybody have more details?

Answer:

The fundamental problem is that constructing a Future with a side-effecting expression is itself a side-effect. You can only reason about Future for pure computations, which unfortunately is not how they are commonly used. Here is a demonstration of this operation breaking referential transparency:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random

val f1 = { 
  val r = new Random(0L)
  val x = Future(r.nextInt)
  for { 
    a <- x
    b <- x
  } yield (a, b) 
}

// Same as f1, but I inlined `x`
val f2 = { 
  val r = new Random(0L)
  for { 
    a <- Future(r.nextInt)
    b <- Future(r.nextInt)
  } yield (a, b) 
}

f1.onComplete(println) // Success((-1155484576,-1155484576))
f2.onComplete(println) // Success((-1155484576,-723955400))    <-- not the same

However this works fine with Task. Note that the interesting one is the non-inlined version, which manages to produce two distinct Int values. This is the important bit: Task has a constructor that captures side-effects as values, and Future does not.

import scalaz.concurrent.Task

val task1 = { 
  val r = new Random(0L)
  val x = Task.delay(r.nextInt)
  for { 
    a <- x
    b <- x 
  } yield (a, b) 
}

// Same as task1, but I inlined `x`
val task2 = { 
  val r = new Random(0L)
  for { 
    a <- Task.delay(r.nextInt)
    b <- Task.delay(r.nextInt)
  } yield (a, b) 
}

println(task1.run) // (-1155484576,-723955400)
println(task2.run) // (-1155484576,-723955400)

Most of the commonly-cited differences like "a Task doesn't run until you ask it to" and "you can compose the same Task over and over" trace back to this fundamental distinction. So the reason it's "totally unusable" is that once you're used to programming with pure values and relying on equational reasoning to understand and manipulate programs it's hard to go back to side-effecty world where things are much harder to understand.

Comments