Tuesday, August 22, 2017

Achieve Enterprise Integration with AWS Lambdas, SWF and SQS

In recent days, We were asked to do an ETL flow using Amazon Web Services. Because we excel in Enterprise Integration we had a particular design in mind to make it happen. The job was pretty simple:
  1. The trigger was a file placed in a particular S3 bucket.
  2. Take the S3 object metadata of the file as the input of the job.
  3. Read the file and package the records in pages, each page is sent asynchronously as a message. This technique is to increase parallelism in the job processing since the files contain one million records in average.
  4. Consume all pages asynchronously and upload them as micro-batches of records into a third-party system via a Restful API.
  5. Other tasks to complete the use case like recording the completion of the job in a database.
On top of these basic requirements we had to make sure the system was robust, resilient and as fast as possible while keeping low the costs of the different systems.

We chose to use different services from Amazon Web Services for this: S3, Simple Workflow (SWF), Simple Queue Service (SQS) and Lambda.

Here a diagram of the solution (click on the image to see it bigger).

Solution diagram
Solution diagram

Why Simple Workflow (SWF)?

As you can see in the diagram, every task is executed by a Lambda function, so why involve Simple Workflow? The answer is simple: We wanted to create an environment where the sequence of task executions was orchestrated by a single entity, and also be able to share with the different tasks the context of the execution.

If you think of this, we wanted to have something similar to a Flow in a Mule app (MuleSoft Anypoint platform).

It is important to highlight that AWS has some specific limits to execute Lambdas like one Lambda function can only run for a maximum of 5 minutes. Due to these limits, we had to break the tasks into small but cohesive units of work while having a master orchestrator that could run longer than that. Here's where the shared context comes useful.

Note: There's another service that plays very well on the serverless paradigm as opposed to SWF, Step Functions, but at the time We were working on this task it was still in Beta, hence not suitable for production. There is a follow-up post about full Serverless integration that will include Step Functions.

Challenges and recommendations

While working with SWF and Lambdas, We learned some things that helped us a lot to complete this assignment. Here I'll show you the situation and solution that worked for me.

Invoke Lambdas from activities, not workflow workers

One thing you should know about working with SWF is that every output of an activity returns as a Promise to the workflow worker - very similar to a Promise in JavaScript. This Promise returns the output as a serialized object that you need to deserialize if you want to use it as an input for a Lambda function execution directly from the workflow worker. This overhead can be very cumbersome if you use it frequently. In your lambdas you're supposed to work with objects directly, not serialized forms.

Here my first advice, even though you can invoke a Lambda function from within a workflow worker don't do it, instead use an Activity worker. This way each workflow worker implements a unit of work that calls an Activity worker which in turn calls a Lambda function internally. Why? Because in the Activity worker you will be able to use a proper object to pass to the Lambda as an input parameter. This technique requires you to deal with some extra plumbing in your SWF code since you'll need one Activity per Lambda, but in the end, this provides you a very flexible and robust mechanism to exchange information between SWF and Lambdas.

See this sequence diagram to understand it.

Workflow, activity and lambda sequence diagram.
Workflow, activity and lambda sequence diagram.

Wrap your payload in a Message object

All in all, we are talking about Enterprise Integration and one of the central pieces is the message. In order to uniformly share information between the workflow and the different Lambdas, it's better to standardize this practice by using a custom Message object. This Message must contain the workflow context you want to share and the payload. When the Lambda functions are called, they receive this Message object that they use to extract the information required to perform the task fully with no external dependency.

Decompose large loads of data into small pieces

As mentioned before, Lambdas are supposed to run small tasks quickly and independently, therefore they have limits that you should be aware of, such as execution time, memory allocation, ephemeral disk capacity, and the number of threads among others. These are serious constraints when working with big amounts of data and long running processes.

In order to overcome these problems, we recommend decomposing the entire file content into small pieces to increase task parallelism and improve performance in a safe manner - actually, this was one of the main reason to use Lambdas since they auto-scale nicely as the parallel processing increases. For this, we divided the file content into packages of records as pages, where each page can contain hundreds or thousands of records. Each page was placed as a message in an SQS queue. The size of the page must consider the limit of 256 KB per message in SQS.

Keep long running processes in Activities, not Lambdas

As you see in the diagram above, there's a poller that is constantly looking for new messages in the SQS queue. This can be a long running process if you expect dozens of thousands of pages. For cases like this, having activities in your flow is very convenient as you can have an activity running for up to one year, this contrasts highly with the 5-minute execution limit of a Lambda function.

Beware of concurrency limits

Consider the scenario where you have an Activity whose purpose is to read the queue and delegate the upload of the micro-batches to an external system. Commonly, to speed up the execution you make use of threads - note I'm talking about Java but other languages have similar concepts. In this Activity, you may use a loop to create a thread per micro-batch to upload.

Lambda has the limit of 1024 concurrent threads, so if you plan to create a lot of threads to speed up your execution, like uploading micro-batches to the external system mentioned above, first and most importantly, use a thread pool to control the number of threads. We recommend do not create instances of Thread or Runnable, instead, create Java lambda functions for each asynchronous task you want to execute. Make sure you use the AWSLambdaAsyncClientBuilder interface to invoke Lambdas, the ones in AWS, asynchronously.


This approach was particularly successful for a situation where we were not allowed to use an integration platform like Mule. It is also a very nice solution if you just need to integrate AWS services and move lots of data among them.

AWS Simple Workflow and Lambda work pretty well together although they have different goals. Keep in mind that an SWF application needs to be deployed on a machine, like a standalone program, either in your own data center or maybe an EC2 instance, or another IaaS. 

This combo will help you to orchestrate and share different contexts, either automated through Activities or manual by using signals, but if what you need is isolated execution and chaining is not relevant to you, then you could use Lambdas only, but the chained execution will no truly isolate them from each other and the starting Lambda may timeout before the Lambdas functions triggered later in the chain finish their execution.

Moreover, every time you work with resources with similar limitations like AWS Lambdas, always bear in mind the restrictions they come with and design your solution based on these constraints, hopefully, in Microflows.  Have a read on the Microflows post by Javier Navarro-Machuca, Chief Architect at IO Connect Services.

To increase parallelism we highly recommend using information exchange systems such as queues, transient databases or files. In AWS you can make use of S3, SQS, RDS or DynamoDB (although our preference is SQS for this task)

Stay tuned as we're a working on a solution that uses Step Functions with Lambdas rather than Simple Workflow for a full Serverless solution integration.

Happy reading!


Enterprise Integration Patterns - http://www.enterpriseintegrationpatterns.com/
Amazon Simple Workflow - https://aws.amazon.com/swf/
Amazon Lambda - https://aws.amazon.com/lambda/ 
Amazon Simple Queue Service - https://aws.amazon.com/sqs/

Thursday, August 3, 2017

Mule Batch - Adding Resiliency to the Manual Pagination Approach

In the previous post Benchmarking Mule Batch Approaches, written by my friend and colleague Victor Sosa, we demonstrated different approaches for processing big files (1-10 GBs) in a batch fashion. The manual pagination strategy proved to be the fastest algorithm, but with one important drawback: it is not resilient. This means that after restarting the server or the application, all the processing progress is lost. Some of the post commenters highlighted that this lack was needed to evaluate this approach against the Mule Batch components equally since the Mule Batch components provide resiliency by default.

In this post, I show how to enhance the manual pagination approach by making it resilient. For the testing of this approach I use a Linux virtual machine with the following hardware configuration:
  • Intel Core i5 7200U @ 2.5 GHz (2 cores)
  • 8 GB RAM
  • 100gb SSD
Using the following software:
  • MySQL Community Server 5.7.19
  • AnyPoint Studio Enterprise Edition 6.2.5
  • Mule Runtime Enterprise Edition 3.8.4 
To process a comma-separated value (.csv) file that contains 821000+ records with 40 columns each, the steps are as follows:
  1. Read the file.
  2. Build the pages.
  3. Store in a Database.

You can find the code in our GitHub repository https://github.com/ioconnectservices/upsertpoc.

The Approach.

We based on the manual pagination approached from the aforementioned article and created a Mule app that processes .cvs files.  This time we added a VM connector to decouple the file read and page creation from the bulk page-based database upsert. We configured the VM connector to use a persistent queue so that messages are stored in the hard disk.

Description of the processing flow:

1. The file (.csv) is read and the number of pages is calculated according to with the number of records configured per batch size, in this case, 800 records are set per page.

2. The file put in the payload as a stream in order to be accessible and forward read it to create pages in each ForEach loop.

3. Each page is sent to a persistent VM connector to store the pages in the DB in a different flow. Make it the VM connector persistent means that pages are written into files in the disk, hence the inbound VM connector can resume the consumption of the messages as files after an application reboot so that the records in those messages can get upserted in the database.


I took the metrics used in the previous Mule Batch article as a baseline to compare the efficiency of this new approach. I recreated very similar flows to test in my environment and I obtained the following results:

Out-of-the-box Mule batch jobs and batch commit components.
  • The total time of the execution is 7 minutes average.
  • The total of memory usage is 1.34Gb average.
Custom pagination.
  • The total time of the execution is 6 minutes average.
  • The total of memory usage is 1.2Gb average.
Custom Pagination with VM connector (This Approach).

At first, I obtain good results with this approach, but they were 30 seconds slower than the "Custom Pagination" one:
  • The total time of the execution (without stopping the server) is 6 minutes and 30 seconds average.
  • The total of memory usage is 1.2Gb average.
After increasing the number of Threads from 25 to 30 in the Async connector configuration these are the results:
  • The total time of the execution (without stopping the server) is 6 minutes average.
  • The total of memory usage is 1.2Gb average.


When designing an enterprise system many factors come into the play and we have to make sure it will work even in disastrous events. Adding resiliency to a solution is a must-have in every system. For us, the VM connector brings this resiliency while keeping the execution costs within the desired parameters. Also, you need to know that some performance tuning should be implemented in order to obtain the best results in resiliency without compromising performance.