ptimson ptimson - 3 months ago 29
Java Question

Set SNS WaitTime in Spring AWS Cloud Framework

I am using Spring AWS Cloud Framework to poll for S3 Event notifications on a queue. I am using the

QueueMessagingTemplate
to do this. I want to be able to set the max number of messages and wait time to poll see: http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html.

import com.amazonaws.services.s3.event.S3EventNotification;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;

public class MyQueue {

private QueueMessagingTemplate queueMsgTemplate;

@Autowired
public MyQueue(QueueMessagingTemplate queueMsgTemplate) {
this.queueMsgTemplate = queueMsgTemplate;

}

@Override
public S3EventNotification poll() {
S3EventNotification s3Event = queueMsgTemplate
.receiveAndConvert("myQueueName", S3EventNotification.class);
}
}


Context

@Bean
public AWSCredentialsProviderChain awsCredentialsProviderChain() {
return new AWSCredentialsProviderChain(
new DefaultAWSCredentialsProviderChain());
}

@Bean
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}

@Bean
public AmazonSQS sqsClient(ClientConfiguration clientConfiguration,// UserData userData,
AWSCredentialsProviderChain credentialsProvider) {
AmazonSQSClient amazonSQSClient = new AmazonSQSClient(credentialsProvider, clientConfiguration);
amazonSQSClient.setEndpoint("http://localhost:9324");
return amazonSQSClient;
}

@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS sqsClient) {
return new QueueMessagingTemplate(sqsClient);
}


Any idea how to configure these? Thanks

Answer

The QueueMessagingTemplate.receiveAndConvert() is based on the QueueMessageChannel.receive() method where you can find a desired code:

@Override
public Message<String> receive() {
    return this.receive(0);
}

@Override
public Message<String> receive(long timeout) {
    ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage(
            new ReceiveMessageRequest(this.queueUrl).
                    withMaxNumberOfMessages(1).
                    withWaitTimeSeconds(Long.valueOf(timeout).intValue()).
                    withAttributeNames(ATTRIBUTE_NAMES).
                    withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES));
    if (receiveMessageResult.getMessages().isEmpty()) {
        return null;
    }
    com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0);
    Message<String> message = createMessage(amazonMessage);
    this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle()));
    return message;
}

So, as you see withMaxNumberOfMessages(1) is hardcoded to 1. And that is correct because receive() can poll only one message. The withWaitTimeSeconds(Long.valueOf(timeout).intValue()) is fully equals to the provided timeout. And eh, you can't modify it in case of receiveAndConvert().

Consider to use QueueMessageChannel.receive(long timeout) and messageConverter from the QueueMessagingTemplate. Like it is done in the:

public <T> T receiveAndConvert(QueueMessageChannel destination, Class<T> targetClass) throws MessagingException {
    Message<?> message = destination.receive();
    if (message != null) {
        return (T) getMessageConverter().fromMessage(message, targetClass);
    } else {
        return null;
    }
}

You can reach an appropriate QueueMessageChannel via code:

String physicalResourceId = this.destinationResolver.resolveDestination(destination);
new QueueMessageChannel(this.amazonSqs, physicalResourceId);