Sajith Silva Sajith Silva - 8 months ago 67
Scala Question

how does onNext is called in scala rx

I'm trying to understand how observable works. Here is my code.

def make: Stream[Int] = {
Stream.cons(scala.util.Random.nextInt(10), {
println("Making ..")

val y = Observable.from(make)

y.foreach(a => println(a))

emit will produce new values every 1 second. I'm making an observable out of it. for each loop will go on forever printing newly produced values.

As I understand, a=>println(a) is a callback value which is called onNext(t) in rx observable.

What i'm trying to figure out is how that is glued to the producer so when new value is produced where does the onNext is called. I looked into rx code for a while and still cannot figure out.



It seems subscribe to the stream calls rx.Observable's

Subscription subscribe(Subscriber<? super T> subscriber) 

This will call IterableProducer.request method which will have this bit of code.

    if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
        // fast-path without backpressure
        while (it.hasNext()) {
            if (o.isUnsubscribed()) {
        if (!o.isUnsubscribed()) {

onNext is the observer code and input to it would be producers Thats how its glued.