user3139545 user3139545 - 13 days ago 6
Scala Question

How to use asynchronos drivers in akka-streams map vs mapAsync

I have just started using reactivecouchbase asynchronous database driver but are running into some basic design issues.
In a traditional approch I would limit the pressure I put on the database by limiting the number of connections to it. However with an asynchronous driver I can just swamp the database with new queries?

Where this has become significant is in an example as follows.

Lets say I have two different ways of calling the database.

My function calls to DB

asyncCallDB: Future[DBResponse]
blockingCallDB: DBResponse


Now I want to map the db calls over a stream where I can use two different functions:

Flow.map()
Flow.mapAsync(numberOfConcurrentCalls)()


Now my questions is how would you select to call the database:

Flow.map(blockingCallDB) //One call at a time with back preassure
Flow.map(asyncCallDB) //Unlimited calls floods db no back pressure?

Flow.mapAsync(numberOfConcurrentCalls)(blockingCallDB) //Up to numberOfConcurrentCalls at the same time with back pressure
Flow.mapAsync(numberOfConcurrentCalls)(asyncCallDB) //Unlimited calls floods db no back pressure?


I feel that my understading is lacking here and would like to understad this type of decission.

Answer

ReactiveCouchbase uses AsyncHttpClient for communication with Couchbase server(s). As you can see in the source code it calls setMaximumConnectionsTotal, which limits the number of concurrent connections. The actual value depends on what you configured in couchbase.http.maxTotalConnections.

There is one AsyncHttpClient for each CouchbaseBucket you create. So there is a maximum of maxTotalConnections connections for each CouchbaseBucket.

From Couchbase documentation on N1QL REST API:

The REST API runs synchronously, so once execution of the statement in the request is started, results are streamed back to the client, terminating when execution of the statement finishes.

So in practice the number of concurrent queries is limited to maxTotalConnections for each of your buckets.

Thus, the backpressure on DB is always limited in some way. Either because you set maxTotalConnections to a non-negative number, or because your client can't create more connections because of limited RAM or number of file descriptors.

However, it's still possible to create too many Futures, so that your client will run out of memory. Whenever you think this may be the case you should probably use mapAsync, as mentioned in this answer, or one of the other techniques mentioned in "Buffers and working with rate" (Akka documentation).

There's a good description of mapAsync in the Akka documentation:

Pass incoming elements to a function that return a Future result. When the future arrives the result is passed downstream. Up to n elements can be processed concurrently...

Keep in mind that Flow.mapAsync won't run anything by itself, it just returns a Flow which you have to connect between a Source and a Sink, then run. Akka Quick Start Guide describes this in a very comprehensible way.

Comments