Wednesday, February 1, 2017

How to prevent duplicates while consuming messages from SQS queues in AWS?

SQS messaging service provided by Amazon is a great solution when it comes to operating at high scale without worrying about the nitty gritty of running your own custom messaging. We have had a lot of success using this service as part of our solutions and it works really well for point to point messaging use cases, if order of delivery is NOT an important criteria for you. SQS used along with SNS works great for pub-sub, along with point-2-point use cases.

However, there is a little quirk to worry about when using SQS, which is at-least once delivery guarantees vs. JMS's once and only once delivery guarantee. A message can be delivered more than once with at-least once delivery guarantee. This can be a problem for certain types of services which is not idempotent. This blog will walk you through an approach we have used with great success in eliminating duplicates with SQS. (Note: SQS has since then released FIFO, exactly once delivery guarantees with a new type of queue, but the rate is limited to 300 TPS, as of the day when this blog was published)

There are many ways to solve this problem, with databases such as DynamoDB, MySQL to keep track of the record that has already been processed and prevent it from being processed again. We are however using Redis for this purpose, mainly due to its low latency single digit ms lookups.

At a high level, the algorithm used is...

if(idempotentService.putIfNotExists(key, value, ttl)){
         //Processing this key for the first time.
}else{
     //Already processed, handle error
Lets dig a bit deeper into what we are doing with the idempotentService implementation.

Put if NOT exists is a Redis command that allows us to set a key, value with a pre-defined TTL using a native Redis command. If the put fails as the item already exists, it is a duplicate and if the put succeeds, we are cleared for processing the message.

The Redis command used is SET.

Since the open source library we use, called Redisson didn't support this specific variation of Redis SET,  so we decided to use LUA script to implement putIfNotExists method which is already supported by Redisson. Shown below is an implementation of putIfNotExists with LUA script.

    public boolean putIfNotExists(String key, Object value, int ttlInSeconds) throws CustomException {
        try {
            if (client != null) {
                if(key != null && value != null) {
                if(rscript == null){
                    rscript = client.getScript();
                    }
               
                if(loadedScript == null){
                loadedScript = rscript.scriptLoad("return redis.call('set', KEYS[1], ARGV[2], 'NX', 'EX', ARGV[1])");
                }
                        String passValue = toJsonString(value);
                        if(passValue != null){
                        Object result = rscript.evalSha(Mode.READ_WRITE, loadedScript, RScript.ReturnType.VALUE,Collections.singletonList(key),ttlInSeconds,passValue);                        

                             if(result!=null && result.toString().equalsIgnoreCase(Constants.LUA_STATUS_OK)){
                              //NOT A DUPLICATE
                              return true;
                             }
//DUPLICATE
                        }else{
                        LOG.error(MessageFormat.format("Class=CachingDaoRedisImpl Method=putIfNotExists intuitTid={0},status={1} errorMessage={2}",LogUtils.printTid(),"ERROR","Error Occured in Parsing the input value to Redis"));
                        }
                       
                }
            }
            return false;
        }catch(Exception ex){
            ex.printStackTrace();
            //Handle Error
        }
    }

This method can be called with the unique identifier for the message (key) and the value (this one doesn't really matter and we use the timestamp as a placeholder) and a TTL (what is the duration you want to check the duplicate messages for a single event - in our case, we wanted to eliminate duplicates that could occur every 5 hrs, but this could change with use case). If this method returns a "true", there is no duplicate and otherwise there is a duplicate and handle the error accordingly.

A sample usage of message processing using a JMS Listener is shown below.

public void onMessage(javax.jms.Message message) {
if(idempotentService.putIfNotExists(message.getProperty("id"),timestamp,ttl)){
//Process Message
}else{
//Skip Message
}
}

I hope this is useful for anyone trying to use SQS and who are worried about duplicate messages.

We use Elasticache Redis to help with duplicate prevention, which is also used for our regular caching needs.