Wednesday, February 1, 2017

How to prevent duplicates while consuming messages from SQS queues in AWS?

SQS messaging service provided by Amazon is a great solution when it comes to operating at high scale without worrying about the nitty gritty of running your own custom messaging. We have had a lot of success using this service as part of our solutions and it works really well for point to point messaging use cases, if order of delivery is NOT an important criteria for you. SQS used along with SNS works great for pub-sub, along with point-2-point use cases.

However, there is a little quirk to worry about when using SQS, which is at-least once delivery guarantees vs. JMS's once and only once delivery guarantee. A message can be delivered more than once with at-least once delivery guarantee. This can be a problem for certain types of services which is not idempotent. This blog will walk you through an approach we have used with great success in eliminating duplicates with SQS. (Note: SQS has since then released FIFO, exactly once delivery guarantees with a new type of queue, but the rate is limited to 300 TPS, as of the day when this blog was published)

There are many ways to solve this problem, with databases such as DynamoDB, MySQL to keep track of the record that has already been processed and prevent it from being processed again. We are however using Redis for this purpose, mainly due to its low latency single digit ms lookups.

At a high level, the algorithm used is...

if(idempotentService.putIfNotExists(key, value, ttl)){
         //Processing this key for the first time.
}else{
     //Already processed, handle error
Lets dig a bit deeper into what we are doing with the idempotentService implementation.

Put if NOT exists is a Redis command that allows us to set a key, value with a pre-defined TTL using a native Redis command. If the put fails as the item already exists, it is a duplicate and if the put succeeds, we are cleared for processing the message.

The Redis command used is SET.

Since the open source library we use, called Redisson didn't support this specific variation of Redis SET,  so we decided to use LUA script to implement putIfNotExists method which is already supported by Redisson. Shown below is an implementation of putIfNotExists with LUA script.

    public boolean putIfNotExists(String key, Object value, int ttlInSeconds) throws CustomException {
        try {
            if (client != null) {
                if(key != null && value != null) {
                if(rscript == null){
                    rscript = client.getScript();
                    }
               
                if(loadedScript == null){
                loadedScript = rscript.scriptLoad("return redis.call('set', KEYS[1], ARGV[2], 'NX', 'EX', ARGV[1])");
                }
                        String passValue = toJsonString(value);
                        if(passValue != null){
                        Object result = rscript.evalSha(Mode.READ_WRITE, loadedScript, RScript.ReturnType.VALUE,Collections.singletonList(key),ttlInSeconds,passValue);                        

                             if(result!=null && result.toString().equalsIgnoreCase(Constants.LUA_STATUS_OK)){
                              //NOT A DUPLICATE
                              return true;
                             }
//DUPLICATE
                        }else{
                        LOG.error(MessageFormat.format("Class=CachingDaoRedisImpl Method=putIfNotExists intuitTid={0},status={1} errorMessage={2}",LogUtils.printTid(),"ERROR","Error Occured in Parsing the input value to Redis"));
                        }
                       
                }
            }
            return false;
        }catch(Exception ex){
            ex.printStackTrace();
            //Handle Error
        }
    }

This method can be called with the unique identifier for the message (key) and the value (this one doesn't really matter and we use the timestamp as a placeholder) and a TTL (what is the duration you want to check the duplicate messages for a single event - in our case, we wanted to eliminate duplicates that could occur every 5 hrs, but this could change with use case). If this method returns a "true", there is no duplicate and otherwise there is a duplicate and handle the error accordingly.

A sample usage of message processing using a JMS Listener is shown below.

public void onMessage(javax.jms.Message message) {
if(idempotentService.putIfNotExists(message.getProperty("id"),timestamp,ttl)){
//Process Message
}else{
//Skip Message
}
}

I hope this is useful for anyone trying to use SQS and who are worried about duplicate messages.

We use Elasticache Redis to help with duplicate prevention, which is also used for our regular caching needs.



Sunday, April 25, 2010

Task Oriented Scripting for DataPower

DPAdmin, a product from iSOAGroup provides a scripting interface for IBM WebSphere DataPower among many other features.

A sample DPAdmin script is shown below.



As you can see from the example shown above, DPAdmin scripting allows the user to create multiple concurrent flows within a script allowing cluster deployments in parallel and also allowing sequencial task execution within a particular flow. DPAdmin scripts can execute a wide varierty of commands ranging from Configuration Management, File Management, Cache Control, Domain Management and a wide array of other functions.

DPAdmin scripts are xml based and platform independant and allows the user to perform deployments and administration functions in a repeatable and error free manner.

DPAdmin scripts can be trigerred by shell scripts, batch scripts or other build management tools.

DPAdmin not only provides scripting mode, but also a menu driven shell mode where users can execute DataPower commands by making easy to use menu driven interface as shown below.



Please click here to learn more..

Fine grained export/import solution for IBM DataPower

Selective Export/Import for DataPower

I would like to introduce you to DPAdmin, a product from iSOAGroup that supports fine grained Export/Import among many other features when it comes to IBM WebSphere DataPower administration/automation.

Shown below is a sample XML that is an input for export commands in DPAdmin. As you can see from the XML, the export allows you to control the services that gets exported for service migration across envrironments.




Now that takes care of exporting selective services from one environment to another. But how would you take care of dynamically changing environment specific properties like port, backend hosts etc?

Yes. DPAdmin has a solution for that as well. DPAdmin supports dynamic imports where it can apply DataPower supported Deployment Policies during the import.

DPAdmin provides a scripting solution to make configuration management of DataPower across multiple devices and multiple environments extremely easy.

Please click here to learn more..

DataPower Administration and Automation using DPAdmin

I would like to introduce you to an administration and automation solution for IBM WebSphere DataPower. Have you ever asked yourself the following questions?

  • How do I perform automated deployments in DataPower using easy to use scripts similar to ANT?
  • Is there a way to make deployments to a cluster of DataPower devices in parallel, saving time and money and mistake free ofcourse?
  • I love solutions like wsadmin for WebSphere application server, Is there something similar for DataPower?
  • Is there a way to do orchestrated, task oriented deployments for DataPower using platform independant scripts?
  • I have always loved shell based command line tools that are menu driven and I would like to use something similar for datapower..Is there anything similar to that?
  • Our organization needs accountability for important commands that are run on our production and staging DataPower devices, Is there a way to capture administration commands and report it later in a user friendly manner?

If you can relate to any of the questions shown above, then you should take a look at DPAdmin, a product from iSOAGroup to help administer and automate IBM WebSphere DataPower.

Click here for more details..