tonicebrian tonicebrian - 6 months ago 21
MySQL Question

How are reactive streams used in Slick for inserting data

In Slick's documentation examples for using Reactive Streams are presented just for reading data as a means of a DatabasePublisher. But what happens when you want to use your database as a Sink and backpreasure based on your insertion rate?

I've looked for equivalent DatabaseSubscriber but it doesn't exist. So the question is, if I have a Source, say:


val source = Source(0 to 100)


how can I crete a Sink with Slick that writes those values into a table with schema:


create table NumberTable (value INT)

Answer

The easiest way would be to do simple inserts within a Sink.foreach. Assuming you've used the schema code generation and further assuming your table is named "NumberTable"

//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow} 

val numberTableDB = Database.forConfig("NumberTableConfig")

Source(0 to 100).runWith(Sink foreach { num => 
  numberTableDB.run(Numbertable += NumbertableRow(num))
})

You could further extend this basic example by batching N inserts at a time:

val batchSize = 10

Source(0 to 100).via(Flow[Int].grouped(batchSize))
                .runWith(Sink foreach { nums =>
  numberTableDB.run(Numbertable ++= nums.map(NumbertableRow.apply))
})