kevin kevin - 1 month ago 30
Java Question

How to wait for data in spring integration TCP Server

My TCP server built using Spring integration works great. I use ByteArrayLengthHeaderSerializer as a serializer.

Once in a while, client data comes very slowly making the server respond very slowly.

I would like to wait a maximum of 5 seconds to read each byte of the data from the client. If the data byte does not come in 5 seconds, I would like to send NAK.

How to set the timeout of 5 seconds? Where should it be set?
Do I need to customize serializer?

Here is my spring context:

<int-ip:tcp-connection-factory id="crLfServer"
type="server"
port="${availableServerSocket}"
single-use="true"
so-timeout="10000"
using-nio="false"
serializer="connectionSerializeDeserialize"
deserializer="connectionSerializeDeserialize"
so-linger="2000"/>

<bean id="connectionSerializeDeserialize" class="org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer"/>

<int-ip:tcp-inbound-gateway id="gatewayCrLf"
connection-factory="crLfServer"
request-channel="serverBytes2StringChannel"
error-channel="errorChannel"
reply-timeout="10000"/> <!-- reply-timeout works on inbound-gateway -->

<int:channel id="toSA" />

<int:service-activator input-channel="toSA"
ref="myService"
method="prepare"/>

<int:object-to-string-transformer id="serverBytes2String"
input-channel="serverBytes2StringChannel"
output-channel="toSA"/>

<int:transformer id="errorHandler"
input-channel="errorChannel"
expression="payload.failedMessage.payload + ':' + payload.cause.message"/>


Thank you

Answer

You would need a custom deserializer; by default when the read times out (after the so-timeout) we close the socket. You would have to catch the timeout and return a partial message, with some information to tell the downstream flow to return the nack.

The deserializer does not have access to the connection so it can't send the nack itself.

You could do it in a custom subclass TcpMessageMapper, though - override toMessage().

That said, your solution might be brittle unless you close the socket anyway because the stream may still contain some data from the previous message, although with single-use true, I assume you are only sending one message per socket.

EDIT

@SpringBootApplication
public class So40408085Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So40408085Application.class, args);
        context.getBean("toTcp", MessageChannel.class).send(new GenericMessage<>("foo"));
        Thread.sleep(5000);
        context.close();
    }

    @Bean
    public TcpNetServerConnectionFactory server() {
        TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(1234);
        server.setSoTimeout(1000);
        server.setMapper(new TimeoutMapper()); // use 'mapper' attribute in XML
        return server;
    }

    @Bean
    public TcpInboundGateway inGate() {
        TcpInboundGateway inGate = new TcpInboundGateway();
        inGate.setConnectionFactory(server());
        inGate.setRequestChannelName("inChannel");
        return inGate;
    }

    @ServiceActivator(inputChannel = "inChannel")
    public String upCase(byte[] in) {
        return new String(in).toUpperCase();
    }

    @Bean
    public TcpNetClientConnectionFactory client() {
        TcpNetClientConnectionFactory client = new TcpNetClientConnectionFactory("localhost", 1234);
        client.setSerializer(new ByteArrayLfSerializer()); // so the server will timeout - he's expecting CRLF
        return client;
    }

    @Bean
    @ServiceActivator(inputChannel = "toTcp")
    public TcpOutboundGateway out() {
        TcpOutboundGateway outGate = new TcpOutboundGateway();
        outGate.setConnectionFactory(client());
        outGate.setOutputChannelName("reply");
        return outGate;
    }

    @ServiceActivator(inputChannel = "reply")
    public void reply(byte[] in) {
        System.out.println(new String(in));
    }

    public static class TimeoutMapper extends TcpMessageMapper {

        @Override
        public Message<?> toMessage(TcpConnection connection) throws Exception {
            try {
                return super.toMessage(connection);
            }
            catch (SocketTimeoutException e) {
                connection.send(new GenericMessage<>("You took too long to send me data, sorry"));
                connection.close();
                return null;
            }
        }

    }

}