Felipe Jun Felipe Jun - 1 year ago 36
Java Question

ClientCallStreamObserver isReady never returns true

I'm making an input stream rate meter. It is basically a service that exposes a request stream call and counts how many messages per second it can handle.

As the client is totally async when it comes to sending messages, I use the ClientCallStreamObserver to start sending messages just when the stream is ready, to avoid memory overflow.

The client code looks like this:

public static void main(String[] args) throws Exception {
ManagedChannel channel = ManagedChannelBuilder.forAddress("server", 4242).usePlaintext(true).build();
ServerGrpc.ServerStub asyncStub = ServerGrpc.newStub(channel);

StreamObserver<MarketDataOuterClass.Trade> inputStream = asyncStub.reportNewTradeStream(new StreamObserver<Empty>() {
public void onNext(Empty empty) {


public void onError(Throwable throwable) {
logger.info("on error response stream");

public void onCompleted() {
logger.info("on completed response stream");

final ClientCallStreamObserver<MarketDataOuterClass.Trade> clientCallObserver = (ClientCallStreamObserver<MarketDataOuterClass.Trade>) inputStream;

while (!clientCallObserver.isReady()) {
logger.info("stream not ready yet");


while (true) {
if (counter.getCounter() % 15000 == 0 ) {
long now = System.nanoTime();
double rate = (double) NANOSEC_TO_SEC * counter.getCounter() / (now - counter.getLastTic());
logger.info("rate: " + rate + " msgs per sec");

My observation loop over isReady is never ending.

OBS: I'm using kubernetes cluster to serve my test, the server is receiving the call and returning a StreamObserver implementation.

Answer Source

isReady should eventually return true, as long as the RPC doesn't error/complete immediately. But the code is not observing flow control properly.

After each call to onNext() to send a request isReady() could begin returning false. Your while (true) loop should instead have the isReady() check at the beginning of each iteration.

Instead of polling, it is better to call serverCallObserver.setOnReadyHandler(yourRunnable) to be notified when the call is ready to send. Note that you should still check isReady() within yourRunnable as there can be spurious/out-of-date notifications.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download