Anish Shah Anish Shah - 3 months ago 24
Scala Question

How to use ConcurrentLinkedQueue in Scala?

val nodes = Array.fill[mutable.Buffer[Int]](numNodes){new ArrayBuffer[Int]() with mutable.SynchronizedBuffer[Int]}

def addMutualEdge(i: Int)(j: Int) {nodes(i) += j; nodes(j) += i}


When I compile this, I get deprecation warning:

SynchronizedBuffer is deprecated. Synchronization via traits is deprecated as it is inherently reliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative


How to use java library in the above code?

Answer

You may just use ConcurrentLinkedQueue instead of Buffer as it's also mutable:

scala> import java.util.concurrent._
import java.util.concurrent._

scala> val nodes = Array.fill(10){new ConcurrentLinkedQueue[Int]()}
nodes: Array[java.util.concurrent.ConcurrentLinkedQueue[Int]] = Array([], [], [], [], [], [], [], [], [], [])

scala> def addMutualEdge(i: Int)(j: Int) {nodes(i).add(j); nodes(j).add(i)}
addMutualEdge: (i: Int)(j: Int)Unit

It's fastest option as this queue is based on CAS-operations, so no blocking there (in comparision with SynchronizedBuffer). Another option is to synchronize operations directly:

scala> val nodes = Array.fill[mutable.Buffer[Int]](10){new ArrayBuffer[Int]()}
nodes: Array[scala.collection.mutable.Buffer[Int]] = Array(ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer())

scala> def addMutualEdge(i: Int)(j: Int) = this.synchronized{nodes(i) += j; nodes(j) += i}
addMutualEdge: (i: Int)(j: Int)scala.collection.mutable.Buffer[Int]

You can also use java's Collections.synchronizedList(...) in combination with scala.collection.JavaConverters.asScala

import java.util._
import scala.collection.JavaConverters._
scala> val nodes = Array.fill(10){Collections.synchronizedList(new ArrayBuffer[Int]().asJava).asScala}
nodes: Array[scala.collection.mutable.Buffer[Int]] = Array(Buffer(), Buffer(), Buffer(), Buffer(), Buffer(), Buffer(), Buffer(), Buffer(), Buffer(), Buffer())

Or you can use AtomicReferenceArray:

implicit class RichAtomic[T](a: AtomicReferenceArray[List[T]]) { def apply(i: Int) = (a,i); def update(i: Int, e: List[T]) = a.set(i, e)}
implicit class RichList[T](a: (AtomicReferenceArray[List[T]], Int)) { def ::=(e: T) = while({val lst = a._1.get(a._2);!a._1.compareAndSet(a._2, lst, e :: lst)}){}}
implicit def toList[T](a: (AtomicReferenceArray[List[T]], Int)) = a._1.get(a._2)

val nodes = new AtomicReferenceArray(Array.fill[List[Int]](10){Nil})

scala> def addMutualEdge(i: Int)(j: Int) = {nodes(i) ::= j; nodes(j) ::= i}
addMutualEdge: (i: Int)(j: Int)Unit

Implicits used to provide simillar interface as for just Array. Note, that ::= adds element to the start of list.