Yadu Krishnan Yadu Krishnan - 1 year ago 72
Scala Question

NullPointerException on executing concurrent queries using Slick

I am working on a Scala application with

Postgres 9.3
Slick 3.1.1
. I am getting Null Pointer Exception on slick driver when multiple queries execute at the same time.

Here is my simplified code. I am creating multiple actors which will call the same method to query from the database.

package com.app.repo

import java.sql.Timestamp

import akka.actor.{Actor, ActorSystem, Props}
import slick.driver.PostgresDriver
import slick.driver.PostgresDriver.api._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}

case class SampleData(id: Long, name: String, createDate: java.sql.Timestamp)

object Tables extends {
val profile = PostgresDriver
} with Tables

trait Tables {
val profile: PostgresDriver

import profile.api._

class SampleDataTable(_tableTag: Tag) extends Table[SampleData](_tableTag, Some("processing"), "SampleData") {
def * = (id, name, createDate) <>(SampleData.tupled, SampleData.unapply)

def ? = (Rep.Some(id), Rep.Some(name), Rep.Some(createDate)).shaped.<>({ r => import r._; _1.map(_ => SampleData.tupled((_1.get, _2.get, _3.get))) }, (_: Any) => throw new Exception("Inserting into ? projection not supported."))

val id: Rep[Long] = column[Long]("SampleId", O.AutoInc, O.PrimaryKey)
val name: Rep[String] = column[String]("Name")
val createDate: Rep[java.sql.Timestamp] = column[java.sql.Timestamp]("CreateDate")

lazy val sampleDataTable = new TableQuery(tag => new SampleDataTable(tag))

class SampleQueryingActor(delay: FiniteDuration, duration: FiniteDuration) extends Actor {

import scala.concurrent.duration._

override def preStart() = {
context.system.scheduler.schedule(0.second, duration, self, "tick")

override def receive: Receive = {
case "tick" => {
println("tick received.. ")
//val range = 1 until 1000
case Success(r) => println(s"got sum as ${r.getOrElse(0)}")
case Failure(ex) => ex.printStackTrace()


object DriverHelper {
val user = "postgres"
val url = "jdbc:postgresql://"
val password = "password"
val jdbcDriver = "org.postgresql.Driver"
val db: PostgresDriver.backend.DatabaseDef = Database.forURL(url, user = user, password = password, driver = jdbcDriver)

object RepositoryImpl {
val db: PostgresDriver.backend.DatabaseDef = DriverHelper.db

val now = new Timestamp(System.currentTimeMillis())

def reader = {
db.run(Tables.sampleDataTable.filter(_.createDate > now).map(_.id).sum.result)

def insertBatchRecords(list: List[SampleData]) = {
db.run(Tables.sampleDataTable ++= list)


object PGConnectionTester extends App {

import scala.concurrent.duration._

val sys = ActorSystem("sys")
sys.actorOf(Props(classOf[SampleQueryingActor], 1.seconds, 10.seconds))
sys.actorOf(Props(classOf[SampleQueryingActor], 1.seconds, 10.seconds))
sys.actorOf(Props(classOf[SampleQueryingActor], 1.seconds, 10.seconds))

When I execute the above code, I get the error as below:

at slick.jdbc.DriverDataSource.getConnection(DriverDataSource.scala:98)
at slick.jdbc.DataSourceJdbcDataSource.createConnection(JdbcDataSource.scala:64)
at slick.jdbc.JdbcBackend$BaseSession.conn$lzycompute(JdbcBackend.scala:415)
at slick.jdbc.JdbcBackend$BaseSession.conn(JdbcBackend.scala:414)
at slick.jdbc.JdbcBackend$SessionDef$class.prepareStatement(JdbcBackend.scala:297)
at slick.jdbc.JdbcBackend$BaseSession.prepareStatement(JdbcBackend.scala:407)
at slick.jdbc.StatementInvoker.results(StatementInvoker.scala:33)
at slick.jdbc.StatementInvoker.iteratorTo(StatementInvoker.scala:22)
at slick.jdbc.Invoker$class.first(Invoker.scala:31)
at slick.jdbc.StatementInvoker.first(StatementInvoker.scala:16)
at slick.driver.JdbcActionComponent$QueryActionExtensionMethodsImpl$$anon$3.run(JdbcActionComponent.scala:228)
at slick.driver.JdbcActionComponent$SimpleJdbcDriverAction.run(JdbcActionComponent.scala:32)
at slick.driver.JdbcActionComponent$SimpleJdbcDriverAction.run(JdbcActionComponent.scala:29)
at slick.backend.DatabaseComponent$DatabaseDef$$anon$2.liftedTree1$1(DatabaseComponent.scala:237)
at slick.backend.DatabaseComponent$DatabaseDef$$anon$2.run(DatabaseComponent.scala:237)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

The actor will invoke the same method every 10 second. However, I am getting this error only for the first time. After that the queries are executing correctly. I am not able to understand why this is happening. In this sample case, there are only some simple read operations. But in my actual case, since the query is failing, some of the data is getting lost without processing correctly.
Is this error is something to do with connection pooling?

Answer Source

Just sharing the information for anyone else facing this issue.

There was a bug with Slick itself. It was reported here. Git user, mustajavi fixed this and was merged to latest Slick branch. With the latest update of 3.1.1, the issue is resolved for me.

Related Links in GitHub:



Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download