Andrei T Andrei T - 3 years ago 119
Android Question

PublishProcess flatMap operator does not get executed(rxjava-2)

I have the following code:

/**
* Request wrapped around flowable.
*/
public abstract class RequestFlowable<T> {

private final PublishProcessor<String> mPublish;
private String mName;

public RequestFlowable(String name) {
mName = name;
mPublish = PublishProcessor.create();
}

public Flowable<T> getFlowable() {
//return createAction();
return mPublish.compose(new FlowableTransformer<String, T>() {
@Override
public Publisher<T> apply(@NonNull Flowable<String> upstream) {
return createAction();
}
});
/*
return mPublish.flatMap(new Function<String, Publisher<? extends T>>() {
@Override
public Publisher<? extends T> apply(@NonNull String s) throws Exception {
return createAction();
}
});
*/

}

protected abstract Flowable<T> createAction();


public String getName() {
return mName;
}

public void start() {
mPublish.onNext("processCommand");
}

@Override
public String toString() {
return "Request: " + mName;
}
}


Now for Single
#EDIT 2

public abstract class Request<T> {
private final SingleSubject<Object> mPublish;
private String mName;

public Request(String name) {
mName = name;
mPublish = SingleSubject.create();

}

public Single<T> getSingle() {
return mPublish.flatMap(o -> createAction());
}

protected abstract Single<? extends T> createAction();


public String getName() {
return mName;
}


public void start() {
mPublish.onSuccess("Start");
}

@Override
public String toString() {
return "Request: " + mName;
}
}


The code from the above works when used with compose, like in code from above but, if instead I put the commented code - aka flatMap for some reason createAction is not executed.



EDIT 2



The code from the above is called from another class the following code(important parts of class added):

public class RequestQueue implements RequestController {
private static final String TAG = RequestQueue.class.getSimpleName();
private PublishSubject<Request> mRequest;
private PublishSubject<RequestFlowable> mRequestFlowable;

@Override
public <T> Single<T> registerRequest(Request<T> request) {
mRequest.onNext(request);
return request.getSingle();
}

@Override
public <T> Flowable<T> registerRequestFlowable(RequestFlowable<T> request) {
mRequestFlowable.onNext(request);
return request.getFlowable();
}

public RequestQueue() {
mRequest = PublishSubject.create();
mRequestFlowable = PublishSubject.create();
mRequest.subscribe(this::actionOnRequest);
mRequestFlowable.subscribe(this::actionOnRequest);
}

private void actionOnRequest(Request request) {
Log.d(TAG, "actionOnRequest() called with: request = [" + request + "]");
request.start();
}


private void actionOnRequest(RequestFlowable request) {
Log.d(TAG, "actionOnRequest() called with: request = [" + request + "]");
request.start();
}
}

Answer Source

(From my comments:)

Why does Single work?

SingleSubject retains the single terminal event it received. Since it can only receive onSuccess and onError, it will "replay" that to late subscribers (also this is why there is no separater ReplaySingleSubject). When you call onSuccess on the SingleSubject, that value is remembered and promplty reemitted when the later subscription happens, calling your createAction. PublishProcessor also remembers its terminal events but onNext is not a terminal event, hence dropped without consumer.

How can the desired behavior be achieved via Processor?

You could reorganize your logic, use BehaviorProcessor or ReplayProcessor.createWithSize(1). Calling onComplete won't execute the flatMap function either.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download