igx igx - 2 months ago 20
Scala Question

complete a future within other future

I have an external future operation which I can override it's onOperation complete method . I want to wrap it and complete by closure of a Promise. however I couldn't complete that future within other future. for example :

import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
def foo():Future[String] = {
val p = Promise[String]()
channel.addListener(new ChannelFutureListener[IoReadFuture] {
override def operationComplete(future: IoReadFuture): Unit = {
p.success("hello")}
}
p.future
}

val x = foo()
x: scala.concurrent.Future[String] = List()
x.onComplete{
case Success(msg) => println(s"$msg world")
case Failure(e) => println(e.getMessage)
}
res1: Unit = ()


is there idiomatic way to do that without blocking?

Answer

I think you are trying to create a function which takes a delay as input and returns a delayed future.

If you want to do that, you can do it in a non-sleeping way using Await and Promise.

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration._


// returns a future delayed by `delay` seconds
def getDelayedFutureOfStringValue(delay: Int, value: String): Future[String] = {
  // we will use this promise for wait only
  val waitProxyPromise = Promise[Int]()
  val delayedFuture = Await.ready(waitProxyPromise.future, delay.second).map({
    case _ => value
  })
  delayedFuture
}

val helloFuture = getDelayedFutureOfStringValue(2, "hello")

The above amy seem like a decent implementation but it is actually not. Await actually blocks the thread. Sadly... there is not easy way to get a delayedFutue in a totally non-blocking way in idiomatic Scala.

You can get a nice non-blocking and non-sleeping delayed future using Timer utility from Java,

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
import java.util.{Timer, TimerTask}

// delay is number of seconds
def getDelayedFutureOfStringValue(delay: Int, value: String): Future[String] = {
  val promise = Promise[String]()
  val timer = new Timer()
  val timerTask = new TimerTask {
    override def run(): Unit = promise.success(value)
  }
  timer.schedule(timerTask, delay * 1000)
  promise.future
}

val niceHelloFuture = getDelayedFutureOfStringValue(2, "hello")

And in case you already have a future with you and you want to use that to compose a dependant future, then its pretty easy.

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}

// I assume you IoReadFuture is either similar to Future or wraps the actual Future   

def foo(value: String):Future[String] = {
  val p = Promise[String]()
  channel.addListener(new ChannelFutureListener[IoReadFuture] {
    override def operationComplete(ioFuture: IoReadFuture): Unit = {
      ioFuture.future.onComplete(_ => p.success(value))
    }
  }
  p.future
}
Comments