Simon Müller Simon Müller - 2 months ago 17
Java Question

Multithreading Thread restart

i'am writing a small app to remote control a (Behringer x32) Mixing console. And i got a problem with the communication.

I'am sending data from the pc (app) to the console (port 10023 UDP Protocol), then the console answers to the port from the pc has send data, (random port).
So i have 2 Threads one for sending data, and one for listening for data from the console..... so every time i send data to the console, i need to change the listening port... so i have to kill the listening thread and start it new.

But after some time a have and the app has about x1000 threads open.

How can i restart the Thread or update the listening port without create a new thread?

here's the code for this section, the whole files are @ gihub

the listening thread class:

public class Receiver implements Runnable {

private List<IReceiverListener> listeners;
private final static int PACKETSIZE = 48;
private int port;

public Receiver() {
listeners = new ArrayList();
}

public void addReceiverListener(IReceiverListener listener) {
listeners.add(listener);
}

private void update(String data, String adress) {
for (IReceiverListener listener : listeners) {
listener.receiveConsoleData(data, adress);
if (data.indexOf("active") > -1) {
listener.incrementWatchDog();
}
}
}

@Override
public void run() {
try {
// Convert the argument to ensure that is it valid
// Construct the socket
while (true) {
//System.out.println("Listen on Port:" + this.port);
DatagramSocket socket = new DatagramSocket(this.port);
// Create a packet
DatagramPacket packet = new DatagramPacket(new byte[PACKETSIZE], PACKETSIZE);
// Receive a packet (blocking)
socket.receive(packet);
// Print the packet
update(new String(packet.getData()), packet.getAddress().toString());
//logger.addLogData(new String(packet.getData())+" "+packet.getAddress().toString());
// Return the packet to the sender
socket.close();
}
} catch (IOException e) {

}
}


public void setPort(int port) {
this.port = port;
}

public int getPort() {
return port;
}
}


and here my port updateFunction

@Override
public void updatePort(int port) {
receiverThread.interrupt();
receiverThread = null;
receiver.setPort(port);
receiverThread = new Thread(receiver);
receiverThread.start();
}


and the sending thread does this, when it sends data:

listener.updatePort(dsocket.getLocalPort());

Answer

This is actually not a threading problem. The problem is, that the receiver thread is stuck in the receive method, so it cannot react to the changed port. However, calling the method DatagramSocket#close from another thread releases the blocking receiver thread with a SocketException.

Thus, you can solve this by closing the currently receiving socket when the receiving port was changed. The receiving thread can now catch the SocketException and create a new DatagramSocket that listens on the new port.

There is no need to kill and recreate threads.


First you put the socket into a field. This allows you to access it from another thread, so you can call the socket.close() method. Second, you put another try-catch block into the while(true) loop, which only catches SocketException.

Something like this might work fine:

public class Receiver implements Runnable {

    private static final int PACKETSIZE = 48;

    private final ConcurrentLinkedQueue<IReceiverListener> listeners = new ConcurrentLinkedQueue<>();

    private volatile DatagramSocket socket;
    private volatile int port;

    public Receiver(int port) {
        this.port = port;
    }

    public void addReceiverListener(IReceiverListener listener) {
        listeners.add(listener);
    }

    public void updatePort(int port) {
        this.port = port;
        DatagramSocket socket = this.socket;
        if (socket != null) {
            socket.close();
        }
    }

    @Override
    public void run() {
        try {
            while (true) {
                receiveLoop(new DatagramSocket(port));
            }
        } catch (IOException e) {
            // handle error
        }
    }

    private void receiveLoop(DatagramSocket newSocket) throws IOException {
        try (DatagramSocket socket = newSocket) {
            this.socket = newSocket;
            while (true) {
                DatagramPacket packet = new DatagramPacket(new byte[PACKETSIZE], PACKETSIZE);
                socket.receive(packet);
                process(packet);
            }
        } catch (SocketException e) {
            // port was changed -> return and restart with a new socket
        } finally {
            this.socket = null;
        }
    }

    private void process(DatagramPacket packet) {
        update(new String(packet.getData()), packet.getAddress().toString());
    }

    private void update(String data, String adress) {
        for (IReceiverListener listener : listeners) {
            listener.receiveConsoleData(data, adress);
            if (data.indexOf("active") > -1) {
                listener.incrementWatchDog();
            }
        }
    }
}

Please note, that this might still contains some bugs. It is only supposed to give you a rough idea of how to solve this.

Comments