Eddie B Eddie B - 5 months ago 127
Java Question

Spring Boot SSL TCPClient ~ StompBrokerRelayMessageHandler ~ ActiveMQ ~ Undertow

I'm attempting to build a websocket messaging app based on the Spring Websocket Demo running ActiveMQ as the STOMP message broker with Undertow. The application runs fine on insecure connections. However, I'm having difficulty configuring the STOMP Broker Relay to forward with SSL connections.

As mentioned in the Spring WebSocket Docs...

The "STOMP broker relay" in the above configuration is a Spring MessageHandler that handles messages by forwarding them to an external message broker. To do so it establishes TCP connections to the broker, forwards all messages to it, and then forwards all messages received from the broker to clients through their WebSocket sessions. Essentially it acts as a "relay" that forwards messages in both directions.

Further, the docs state a dependency on reactor-net which I have...

Please add a dependency on org.projectreactor:reactor-net for TCP connection management.

The issue is that my current implementation doesn't initialize the NettyTCPClient via SSL so the ActiveMQ connection fails with an SSLException.

[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED:
[id: 0xcfef39e9, / => localhost/]
[o.a.a.b.TransportConnection.Transport:245] »
Transport Connection to: tcp:// failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?

As such I've attempted to research the Project Reactor Docs to set SSL options for the connection but I haven't been successful.

At this point I've found the StompBrokerRelayMessageHandler initializes the NettyTCPClient by default in Reactor2TcpClient yet, it doesn't appear to configurable.

Assistance would be greatly appreciated.





public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class);

private final static String KEYSTORE = "/activemq.jks";
private final static String KEYSTORE_PASS = "xxx";
private final static String KEYSTORE_TYPE = "JKS";
private final static String TRUSTSTORE = "/activemq_certs.jks";
private final static String TRUSTSTORE_PASS = "xxx";

private static String getBindLocation() {
return "stomp+ssl://localhost:8442?transport.needClientAuth=false";

@Bean(initMethod = "start", destroyMethod = "stop")
public SslBrokerService activeMQBroker() throws Exception {

final SslBrokerService service = new SslBrokerService();

KeyManager[] km = SecurityManager.getKeyManager();
TrustManager[] tm = SecurityManager.getTrustManager();

service.addSslConnector(getBindLocation(), km, tm, null);
final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test");
service.setDestinations(new ActiveMQDestination[]{topic});

return service;

public void configureMessageBroker(MessageBrokerRegistry config) {

public void registerStompEndpoints(StompEndpointRegistry registry) {

private static class SecurityManager {


SOLVED Per Rossens Advice. Here's the implementation details for anyone interested.


public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
ConfigurationReader reader = new StompClientDispatcherConfigReader();
Environment environment = new Environment(reader).assignErrorJournal();
TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443));
return handler;


private static class StompTcpClientSpecFactory
implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class);

private final String host;
private final int port;
private final String KEYSTORE = "src/main/resources/tcpclient.jks";
private final String KEYSTORE_PASS = "xxx";
private final String KEYSTORE_TYPE = "JKS";
private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks";
private final String TRUSTSTORE_PASS = "xxx";
private final String TRUSTSTORE_TYPE = "JKS";
private final Environment environment;

private final SecurityManager tcpManager = new SecurityManager

public StompTcpClientSpecFactory(Environment environment, String host, int port) {
this.environment = environment;
this.host = host;
this.port = port;

public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {

return tcpClientSpec
.ssl(new SslOptions()
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.connect(this.host, this.port);


The StompBrokerRelayMessageHandler has a tcpClient property you can set. However it looks like we don't expose that through the WebSocketMessageBrokerConfigurer setup.

You can remove @EnableWebSocketMessageBroker and extend DelegatingWebSocketMessageBrokerConfiguration instead. It's effectively the same but you're now extending directly from the configuration class that provides all the beans.

This allows you to then override the stompBrokerRelayMessageHandler() bean and set its TcpClient property directly. Just make sure the overriding method is marked with @Bean.