vish4071 vish4071 - 7 months ago 41
Java Question

Java Threading: Excessive CPU Utilization

I'm using a service that reads messages from

Kafka
and pushes it into
Cassandra
.

I'm using a threaded architecture for the same.

There are say,
k threads
consuming from Kafka topic. These write into a queue, declared as:

public static BlockingQueue<>


Now there are a number of threads, say
n
, which write into Cassandra. Here is the code that does that:

public void run(){
LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
while (!Thread.currentThread().isInterrupted()) {
Thread.yield();
if (!content.isEmpty()) {
try {
JSONObject msg = content.remove();
// JSON
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (Exception e) {

}
}
}
}


content
is the BlockingQueue used for read-write operations.

I'm extending the
Thread
class in the implementation of threading and there are a fixed number of threads that continue execution unless interrupted.

The problem is, this is using too much of CPU. Here is the first line of
top
command:

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
46232 vishran+ 20 0 3010804 188052 14280 S 137.8 3.3 5663:24 java


Here is the output of
strace
on a thread of this process:

strace -t -p 46322
Process 46322 attached
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
....and so on


Why I am using
Thread.yield()
, is because of this

If you want any other information for debugging, please let me know.

Now the question is, how can CPU utilization be minimized?

Answer

The whole purpose of a BlockingQueue is that it blocks when it is empty. So the consumer threads (the ones populating into Cassandra) don't have to manually check if they are empty. You can just make a call to take() and if the queue is empty, the call will block unless it is interrupted or if there is an element available.

When a thread is blocked, the scheduler can schedule some other thread in its place which saves you from calling yield() and so on. Remember that yield() will give way to another thread only if a thread with a priority greater than or equal to the thread that is yielding is available to run.

public void run(){
    LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
    try {
            JSONObject msg = content.take();
            // JSON
            for(String tableName : tableList){
                CassandraConnector.getSession().execute(createQuery(tableName, msg));
            }
     } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
     }
}