shemerk shemerk - 5 months ago 157
Java Question

RabbitMQ - Java Spring - how to init exchange to several queues?

I'm having difficulty finding a Spring way to initial an exchange that's sending the incoming message to more then 1 queue - on my Spring-boot application:

I can't find a good way to define a seconds exchange-queue binding.

I'm using RabbitTemplate as the producer client.

The RabbitMQ 6 page tutorial doesn't really help with that since:


  1. the only initial several temporary queues from the Consumer on-demand (while I need to the Producer to do the binding - to persistant queues)

  2. The examples are for basic java usage - not using Spring capabilities.



I also failed to find how to implement it via The spring AMQP pages.

what I got so far, is trying to inject the basic java binding to the spring way of doing it - but it's not working....

@Bean
public ConnectionFactory connectionFactory() throws IOException {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection conn = connectionFactory.createConnection();
Channel channel = conn.createChannel(false);

channel.exchangeDeclare(SPRING_BOOT_EXCHANGE, "fanout");
channel.queueBind(queueName, SPRING_BOOT_EXCHANGE, ""); //first bind
channel.queueBind(queueName2, SPRING_BOOT_EXCHANGE, "");// second bind

return connectionFactory;
}


Any help would be appreciated

Edited



I think the problem arise with the fact that every time I restart my server it tries to redefine the exchange-query-binding - while they persist in the broker...
I managed to define them manually via the brokers UI console - so the Producer only aware of the exchange name, and the Consumer only aware to it's relevant queue.
Is there a way to define those element progrematically - but in such a way so it won't be redefined\overwritten if already exist from previous restarts?

Answer

We use an approach similar to the following to send data from one specific input channel to several input queues of other consumers:

@Bean
public IntegrationFlow integrationFlow(final RabbitTemplate rabbitTemplate, final AmqpHeaderMapper amqpHeaderMapper) {
    IntegrationFlows
        .from("some-input-channel")
        .handle(Amqp.outboundAdapter(rabbitTemplate)
        .headerMapper(headerMapper))
        .get()    
}

@Bean
public AmqpHeaderMapper amqpHeaderMapper() {
    final DefaultAmqpHeaderMapper headerMapper = new DefaultAmqpHeaderMapper();
    headerMapper.setRequestHeaderNames("*");
    return headerMapper;
}

@Bean
public ConnectionFactory rabbitConnectionFactory() {
   return new CachingConnectionFactory();
}

@Bean
public RabbitAdmin rabbitAdmin(final ConnectionFactory rabbitConnectionFactory) {
    final RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}

@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory rabbitConnectionFactory, final RabbitAdmin rabbitAdmin) {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);

    final FanoutExchange fanoutExchange = new FanoutExchange(MY_FANOUT.getFanoutName());
    fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin);
    for (final String queueName : MY_FANOUT.getQueueNames) {
        final Queue queue = new Queue(queueName, true);
        queue.setAdminsThatShouldDeclare(rabbitAdmin);

        final Binding binding = BindingBuilder.bind(queue).to(fanoutExchange);
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
    }
    rabbitTemplate.setExchange(fanoutExchange);    
}

and for completeness here's the enum for the fanout declaration:

public enum MyFanout {
    MY_FANOUT(Lists.newArrayList("queue1", "queue2"), "my-fanout"),

    private final List<String> queueNames;
    private final String fanoutName;

    MyFanout(final List<String> queueNames, final String fanoutName) {
        this.queueNames = requireNonNull(queueNames, "queue must not be null!");
        this.fanoutName = requireNonNull(fanoutName, "exchange must not be null!");
    }

    public List<String> getQueueNames() {
        return this.queueNames;
    }

    public String getFanoutName() {
        return this.fanoutName;
    }
}

Hope it helps!

Comments