Felipe Jun Felipe Jun - 2 days ago 3
Java Question

ClientCallStreamObserver isReady never returns true

I'm making an input stream rate meter. It is basically a service that exposes a request stream call and counts how many messages per second it can handle.

As the client is totally async when it comes to sending messages, I use the ClientCallStreamObserver to start sending messages just when the stream is ready, to avoid memory overflow.

The client code looks like this:

public static void main(String[] args) throws Exception {
ManagedChannel channel = ManagedChannelBuilder.forAddress("server", 4242).usePlaintext(true).build();
ServerGrpc.ServerStub asyncStub = ServerGrpc.newStub(channel);


StreamObserver<MarketDataOuterClass.Trade> inputStream = asyncStub.reportNewTradeStream(new StreamObserver<Empty>() {
@Override
public void onNext(Empty empty) {

}

@Override
public void onError(Throwable throwable) {
logger.info("on error response stream");
}

@Override
public void onCompleted() {
logger.info("on completed response stream");
}
});

final ClientCallStreamObserver<MarketDataOuterClass.Trade> clientCallObserver = (ClientCallStreamObserver<MarketDataOuterClass.Trade>) inputStream;

while (!clientCallObserver.isReady()) {
Thread.sleep(2000);
logger.info("stream not ready yet");
}

counter.setLastTic(System.nanoTime());

while (true) {
counter.inc();
if (counter.getCounter() % 15000 == 0 ) {
long now = System.nanoTime();
double rate = (double) NANOSEC_TO_SEC * counter.getCounter() / (now - counter.getLastTic());
logger.info("rate: " + rate + " msgs per sec");
counter.clear();
counter.setLastTic(now);
}
inputStream.onNext(createRandomTrade());
}
}


My observation loop over isReady is never ending.

OBS: I'm using kubernetes cluster to serve my test, the server is receiving the call and returning a StreamObserver implementation.

Answer

isReady should eventually return true, as long as the RPC doesn't error/complete immediately. But the code is not observing flow control properly.

After each call to onNext() to send a request isReady() could begin returning false. Your while (true) loop should instead have the isReady() check at the beginning of each iteration.

Instead of polling, it is better to call serverCallObserver.setOnReadyHandler(yourRunnable) to be notified when the call is ready to send. Note that you should still check isReady() within yourRunnable as there can be spurious/out-of-date notifications.