Wednesday, February 1, 2017

How to prevent duplicates while consuming messages from SQS queues in AWS?

SQS messaging service provided by Amazon is a great solution when it comes to operating at high scale without worrying about the nitty gritty of running your own custom messaging. We have had a lot of success using this service as part of our solutions and it works really well for point to point messaging use cases, if order of delivery is NOT an important criteria for you. SQS used along with SNS works great for pub-sub, along with point-2-point use cases.

However, there is a little quirk to worry about when using SQS, which is at-least once delivery guarantees vs. JMS's once and only once delivery guarantee. A message can be delivered more than once with at-least once delivery guarantee. This can be a problem for certain types of services which is not idempotent. This blog will walk you through an approach we have used with great success in eliminating duplicates with SQS. (Note: SQS has since then released FIFO, exactly once delivery guarantees with a new type of queue, but the rate is limited to 300 TPS, as of the day when this blog was published)

There are many ways to solve this problem, with databases such as DynamoDB, MySQL to keep track of the record that has already been processed and prevent it from being processed again. We are however using Redis for this purpose, mainly due to its low latency single digit ms lookups.

At a high level, the algorithm used is...

if(idempotentService.putIfNotExists(key, value, ttl)){
         //Processing this key for the first time.
}else{
     //Already processed, handle error
Lets dig a bit deeper into what we are doing with the idempotentService implementation.

Put if NOT exists is a Redis command that allows us to set a key, value with a pre-defined TTL using a native Redis command. If the put fails as the item already exists, it is a duplicate and if the put succeeds, we are cleared for processing the message.

The Redis command used is SET.

Since the open source library we use, called Redisson didn't support this specific variation of Redis SET,  so we decided to use LUA script to implement putIfNotExists method which is already supported by Redisson. Shown below is an implementation of putIfNotExists with LUA script.

    public boolean putIfNotExists(String key, Object value, int ttlInSeconds) throws CustomException {
        try {
            if (client != null) {
                if(key != null && value != null) {
                if(rscript == null){
                    rscript = client.getScript();
                    }
               
                if(loadedScript == null){
                loadedScript = rscript.scriptLoad("return redis.call('set', KEYS[1], ARGV[2], 'NX', 'EX', ARGV[1])");
                }
                        String passValue = toJsonString(value);
                        if(passValue != null){
                        Object result = rscript.evalSha(Mode.READ_WRITE, loadedScript, RScript.ReturnType.VALUE,Collections.singletonList(key),ttlInSeconds,passValue);                        

                             if(result!=null && result.toString().equalsIgnoreCase(Constants.LUA_STATUS_OK)){
                              //NOT A DUPLICATE
                              return true;
                             }
//DUPLICATE
                        }else{
                        LOG.error(MessageFormat.format("Class=CachingDaoRedisImpl Method=putIfNotExists intuitTid={0},status={1} errorMessage={2}",LogUtils.printTid(),"ERROR","Error Occured in Parsing the input value to Redis"));
                        }
                       
                }
            }
            return false;
        }catch(Exception ex){
            ex.printStackTrace();
            //Handle Error
        }
    }

This method can be called with the unique identifier for the message (key) and the value (this one doesn't really matter and we use the timestamp as a placeholder) and a TTL (what is the duration you want to check the duplicate messages for a single event - in our case, we wanted to eliminate duplicates that could occur every 5 hrs, but this could change with use case). If this method returns a "true", there is no duplicate and otherwise there is a duplicate and handle the error accordingly.

A sample usage of message processing using a JMS Listener is shown below.

public void onMessage(javax.jms.Message message) {
if(idempotentService.putIfNotExists(message.getProperty("id"),timestamp,ttl)){
//Process Message
}else{
//Skip Message
}
}

I hope this is useful for anyone trying to use SQS and who are worried about duplicate messages.

We use Elasticache Redis to help with duplicate prevention, which is also used for our regular caching needs.



15 comments:

  1. What if the worker polling that SQS queue dies in between processing a message. After the message visibility timeout expires for that message, it will be given to another worker. Now, this worker will try to do the same redis putIfNotExists call, but it will now get false result, i.e its duplicate. In that case, we'll probably halt the message execution as we think its duplicate. So, this putIfNotExists is not sufficient enough to tell weather the message is actually a duplicate.

    ReplyDelete
  2. Great question. Here are some safety measures we have in place, although it is very hard to eliminate it completely, to your point.

    1. Our visibility timeout is set to be higher than the timeouts defined for other network calls. This ensures that the failure will always happen before the visibility timeout expires.
    2. Whenever a failure happens we correct the cache state, to reflect the action that was taken/not taken.
    3. putIfNotExists is implemented as a LUA script, that runs in a semaphore allowing only one app node to operate on it at any given instant.

    Even after all these settings, there could be a an app node crash (or kill -9) that happens right after the putIfNotExists was called and before the processing completes, and in that case..we will prevent the execution of the transaction.

    To make it really fail safe, we could potentially guard the transaction with a begin/end and use the result of the entry/exit to update the cache state, but our use case can tolerate losses in the extreme boundary conditions, which might not be true for everyone though.

    Also we haven't seen this happening based on the metrics we have in place after years of usage. But something to watch out for , for sure.

    ReplyDelete
  3. Thanks for providing your information. Keep update and share AWS Online Training

    ReplyDelete
  4. This comment has been removed by the author.

    ReplyDelete
  5. Maybe a better approach to solve this issue is use not only the message ID as the key but also the ApproximateReceiveCount. This way you will not block retries in case your message processing failed or you consumer dies.

    ReplyDelete
    Replies
    1. Good call. I will update the article with your feedback. Thanks @Itai.

      Delete
  6. SQS FIFO provides exactly once delivery guarantee. No need for Redis for de-dup.

    ReplyDelete
    Replies
    1. Agreed. The major limitation that i have found with FIFO mode is the throughput, which is very low. We operate SQS at 15k-20k TPS at our peak loads and FIFO queues only support 300tps (individual) and 3k tps (with batching). This was limiting for our use case, but might work for others without the added complexity to dedup.

      Delete