tonicebrian tonicebrian - 1 year ago 55
MySQL Question

How are reactive streams used in Slick for inserting data

In Slick's documentation examples for using Reactive Streams are presented just for reading data as a means of a DatabasePublisher. But what happens when you want to use your database as a Sink and backpreasure based on your insertion rate?

I've looked for equivalent DatabaseSubscriber but it doesn't exist. So the question is, if I have a Source, say:

val source = Source(0 to 100)

how can I crete a Sink with Slick that writes those values into a table with schema:

create table NumberTable (value INT)

Answer Source

The easiest way would be to do simple inserts within a Sink.foreach. Assuming you've used the schema code generation and further assuming your table is named "NumberTable"

//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow} 

val numberTableDB = Database.forConfig("NumberTableConfig")

Source(0 to 100).runWith(Sink foreach { num => += NumbertableRow(num))

You could further extend this basic example by batching N inserts at a time:

val batchSize = 10

Source(0 to 100).via(Flow[Int].grouped(batchSize))
                .runWith(Sink foreach { nums => ++=