Tuesday, September 26, 2017

AWS Step Functions in Enterprise Integration

The article, Achieve enterprise integration with AWS, depicts the orchestration of Lambdas using Amazon Simple Workflow (SWF) with outstanding results. As stated, SWF requires a standalone application running in order to process the flows and this time we wanted to migrate the application to a 100% serverless solution. The article also mentions that a new service is available and looks very promising in the serverless scenario, Step Functions. Here, we want to show you how we took the previous approach and transform it into a Step Functions-led approach.

AWS Step Functions is a service that helps you to create a flow based on several units of work, often implemented in AWS Lambdas. This service is basically a state machine: given an input, an initial state will compute what's required by the underlying implementation and will generate an output. This output serves as the input for the next state whose output might be used as an input for another step and so on until the flow is completed and the last state gets executed. Each state, or node in the visual editor in AWS Step Functions console, is implemented with a Lambda and the flow of the state machine is orchestrated by the logic specified in the transitions' definitions.


AWS Step Functions provides the following functionality:

  • Create a message channel between your Lambdas.
  • Monitoring the functionality of the Lambdas by reporting the status of each.
  • Automatically trigger each step.

The Scenario

In IO Connect services we want to test this new feature of AWS with an enterprise integration use case based on the scenario described in the SWF implementation. We modified the file size according to the AWS Step Functions free tier for testing purposes:

  1. Reduced the CSV (comma separated values) file stored in AWS S3, from 800k+ to 100K+ records and 40 columns for each record. This because we want to be sure the number of state transactions will not overpass the 4000 established in the free tier, reduce the records to 100+ give me approximately 400+ pages to be created, in the case of the "Parallel Branches" approach (explained below) it consumes 1200+ transactions, give me at least 3 runs before pass the free tier limit vs the 3200+ pages created by the original file that consumes approximately 9200+ transactions, generating a cost of $0.14 USD the first execution and $0.23 USD per execution after.
  2. Create pages of the file according to the batch size specified. SQS has a limit of 256KB per message, and using the UTF-8 charset with 250 records peer page/message gives me 230KB approximately.
  3. Store the pages in individual files in a Storage Service like AWS S3.


For this approach, the main idea is to use AWS Step Functions as an orchestrator in order to see all its features - logs, visual tools, easy tracking, etc. - that provides to support enterprise integration. The actual work parts are implemented using AWS Lambda. Because of the AWS Lambda limits, the units of work are very small to avoid reaching these limits, hence a good flow written in AWS Step Functions requires a series of steps perfectly orchestrated.


What we can do with AWS Step Functions.

This was a completely new tool for us so we did some diligence to investigate about what can, can't-do and other useful information about this tool:

Can.

  • Use a simple JSON format text to create the State Machine.
  • Use states to call AWS Lambda Functions.
  • Run a defined number of branches that execute steps in parallel.
  • Have a different language for your Lambdas in the same State Machine.
  • Send serializable objects like POJOS in the message channel.
  • Create, Run and Delete State Machines using the API.
  • Use the logs and other visual tools in order to see execution messages.
Can not.
  • Edit an already created State Machine. You'll have to remove it and then re-create a new state machine.
  • Launch the state machine using a trigger event like create a file in S3. Instead, you'll need to write a Lambda to trigger it.
  • Create a dynamic number of branches of states to be run in parallel. It's always a pre-defined set of parallel tasks.
  • Use visual tools (like drag and drop) to create the state machine. All the implementation must be done writing a JSON. The tool only shows you a graph with the representation of the state changes, but you can not use it to create the state machine, it only visualizes it.
  • Send non-serializable objects in the message channel. This is a big point as you must be sure the objects you return in your Lambda are serializable.
  • Resume the execution if some of the steps fail. Either it runs completely or fails completely.
Consider this. 
  • The free tier allows you 4,000 step transitions free per month.
  • All the communication between steps is made using JSON objects.
  • A state machine name must be 1-80 characters long.
  • The maximum length of the JSON used as input or result for a state is 32768 characters.

Approach 1: Lambda Orchestration.

For the first approach, we wanted to test how Step Functions work. For this purpose, we only set two steps in order to see what we can examine using the Step Functions logs and graph.

The StateMachine JSON
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
  "StartAt": "FileIngestion",
  "States": {
    "FileIngestion": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:functionLambdaFileIngestion",
      "Next": "Paginator"
    },
    "Paginator": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:functionLambdaPaginator",
      "End": true
    }
  }
}

The Graph

  1. As mentioned before, a state machine cannot be triggered by an S3 event or similar, so we used a Lambda to trigger the state machine when an S3 object is created in a certain bucket and ends with ".csv" extension, then this Lambda passes the execution to the state machine by starting it and also passes the S3 object details as an input parameter.
  2. The FileIngestion step calls a Lambda that reads the information provided by the event trigger to locate and read the file created in S3, calculate the number of pages to create and returns this number and the file locations as output.
  3. The Paginator calls a Lambda that this reads the lines of one single page, stores it in a variable, then call another Lambda in an async mode to write a file with the page content. This process is repeated until the original file is completely read.
In this approach, the Lambdas have more flow control than the state machine, because one Lambda calls another and orchestrates the asynchronous executions. Also in this case, if the Lambda that writes the pages fails you can not notice it in the graph, you need to check the Lambda executions and manually identify which Lambda and why it failed.

The Metrics. 
  • The total time of execution is 4 minutes average to process 100K+ records.

Approach 2: Lineal Processing.

Taking in count the previous implementation, We wanted to create a state machine that has more authority of the control of the flow execution. As a first step, we decided to implement a linear execution with no parallelization defined.

The StateMachine JSON
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
{
  "StartAt": "FileAnalizer",
  "States": {
    "FileAnalizer": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:functionLambdaFileIngestion",
      "Next": "FileChecker"
    },    
    "FileChecker":{
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.writeComplete",
          "BooleanEquals": false,
          "Next": "PageCreator"
        }
      ],
      "Default": "QueueChecker"
    },
    "ReadSQStoS3":{
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:functionLambdaPageWriter",
      "Next": "QueueChecker"
    },    
    "QueueChecker": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.emptyQueue",
          "BooleanEquals": true,
          "Next": "SuccessState"
        }
      ],
      "Default": "ReadSQStoS3"
    },    
    "PageCreator": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:functionLambdaPaginator",
      "Next": "FileChecker"
    },    
    "SuccessState": {
      "Type": "Pass",
      "End": true
    }
  }
}

The Graph


  1. The FileAnalizer step calls a Lambda function that consumes the .csv file and creates a POJO with the start and end byte of each page to be created, these bytes are calculated depending on the page size parameter specified in the Lambda. You can see it this way, FileAnalizer creates an index of the start and end byte for each page.
  2. FileChecker is a Choice Step that verifies a boolean variable in which it determines if all the pages were completed. This information is stored in a SQS queue.
  3. PageCreator calls a Lambda that reads the start and end bytes of each page in the received POJO, reads the S3 file only in that portion - start and end bytes- and creates a SQS message with the page content.
  4. QueueChecker is similar to FileChecker but in this case waits until no messages are left in the SQS queue.
  5. ReadSQStoS3 is a Resource step that calls a Lambda function, it reads the messages in the SQS queue that represents a page of the .csv file and stores it in an S3 folder.
  6. SuccessState ends of the state machine execution.
For this approach, the message channel always contains the POJO with the start and end bytes of each page.

The Metrics.
  • The total time of execution is 15 minutes average to process 100K+ records.

Approach 3: Batch Writing.

We took the same state machine for the linear processing but, the Lambda resource in step ReadSQStoS3 was modified with the intention to reduce the execution time of the previous approach. I've added a long polling behavior in the Lambda with a maximum of 10 messages, with this, the Lambda waits for a maximum of 10 messages in SQS if available (if 20 seconds pass and the 10 messages are not visible, it gets the maximum available at that moment) in the queue, get them and calls another Lambda asynchronously to write these 10 messages.

The Metrics. 
  • The total time of execution is 10 minutes average to process 100K+ records.

Approach 4: Parallel Branches.

For this implementation, we added a series of 5 branches in order to read the first and end byte of each page and send a message to SQS with the page content in parallel.

Here we faced two problems:
  1. We confirm the Step Functions limitation that you cannot create a dynamic number of parallel branches. This means you have to define a fixed set of parallel jobs since the beginning.
  2. At the end of the parallel execution, meaning the 5 tasks depicted below, Step Functions aggregates the results of all tasks and passes a single message with all results in it. This results in a problem with big JSON structures that can convert into a bigger JSON at the end of the parallel execution. If this JSON is bigger than 32768 characters then an error will be thrown and the execution will fail.
The StateMachine JSON
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
{
  "StartAt": "FileAnalizer",
  "States": {
    "FileAnalizer": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:LambdaFileIngestion",
      "Next": "FileChecker"
    },
    "FileChecker":{
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.writeComplete",
          "BooleanEquals": false,
          "Next": "ParallelWritePage"
        }
      ],
      "Default": "QueueChecker"
    },
    "ParallelWritePage":{
      "Type": "Parallel",
      "Next": "DeleteRead",
      "Branches": [
        {
          "StartAt": "SetBatchIndex0",
          "States": {
            "SetBatchIndex0": {
              "Type": "Pass",
              "Result": 0,
              "Next": "PageCreator0"
           },
            "PageCreator0": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:LambdaPaginator",
              "End": true
            }
          }
        },
        {
          "StartAt": "SetBatchIndex1",
          "States": {
            "SetBatchIndex1": {
              "Type": "Pass",
              "Result": 1,
              "Next": "PageCreator1"
           },
            "PageCreator1": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:LambdaPaginator",
              "End": true
            }
          }
        },
        {
          "StartAt": "SetBatchIndex2",
          "States": {
            "SetBatchIndex2": {
              "Type": "Pass",
              "Result": 2,
              "Next": "PageCreator2"
           },
            "PageCreator2": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:LambdaPaginator",
              "End": true
            }
          }
        },
        {
          "StartAt": "SetBatchIndex3",
          "States": {
            "SetBatchIndex3": {
              "Type": "Pass",
              "Result": 3,
              "Next": "PageCreator3"
           },
            "PageCreator3": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:LambdaPaginator",
              "End": true
            }
          }
        },
        {
          "StartAt": "SetBatchIndex4",
          "States": {
            "SetBatchIndex4": {
              "Type": "Pass",
              "Result": 4,
              "Next": "PageCreator4"
           },
            "PageCreator4": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:LambdaPaginator",
              "End": true
            }
          }
        }
      ]
    },
    "DeleteRead":{
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:LambdaGetLastIndex",
      "Next": "FileChecker"
    },
    "ReadSQStoS3":{
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:LambdaPageWriter",
      "Next": "QueueChecker"
    },    
    "QueueChecker": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.emptyQueue",
          "BooleanEquals": true,
          "Next": "SuccessState"
        }
      ],
      "Default": "ReadSQStoS3"
    },    
    "SuccessState": {
      "Type": "Pass",
      "End": true
    }
  }
}


The Graph


  1. FileAnalizer is a resource step in which a Lambda is called to read the csv file stored in AWS S3, create the index of the start and end byte of each page and storage this indexes in an external service like a database, file, cache service, etc. 
  2. FileChecker is a choice step. It reads a boolean variable that indicates if all the pages were already stored as SQS Messages.
  3. SetBatchIndexX is a transition step. Sets a variable used by "PageCreator" to know which index is next to be read.
  4. PageCreatorX depending on the integer value passed by "SetBatchIndex", it extracts the page's start and end bytes, use them to read the CSV File only by the portion defined by these indexes and sends a SQS Message with the page content. It returns the information it reads in order to know in the next step which pages are left.
  5. DeleteRead receives the output payload of the parallel process and determines what pages were already written in SQS and deletes the information related to these pages from the external service (step 1).
  6. QueueChecker, ReadSQStoS3 and SuccessState, works as the same of the "Batch Writing" approach.
The Metrics. 
  • The total time of execution is 6 minutes average to process 100K+ records. 

Conclusion.

AWS Step Functions is a tool that allows you to create and manage orchestrate flows based on small units of work. The simplicity of the language makes it perfect for quick implementations, as long as you already have the units of work identified.

Unfortunately, as this is fairly new service in the AWS ecosystem, functionality is severely limited. A proof of this is the fact that you need to maintain a fixed number of parallel steps and if you end up having less work than parallel steps you must add control logic to avoid unexpected errors.

Moreover, given the limits found in AWS Lambda and Step Functions, computing of high workloads of information can be very difficult if you don't give a good thought to your design to decompose the processing. We highly recommend you give a read to our blog Microflows to have an understanding of what this means.

On the plus side. if you want to transport small portions of data or compute small processes in a serverless fashion, Step Functions is a good tool for it.
In the future, we will evaluate a combination of other new AWS services like AWS Glue and AWS Batch together with Step Functions to achieve outstanding big data processing and enterprise integration.

Thanks for taking time to read this post. I hope this is helpful to you at the moment you decide to use Step Functions and do not hesitate to drop a comment if you have any question.