While playing around with Terraform, I realized how hard it is to find a simple working example for spinning up a Lambda function triggered by SQS messages. As a way to consolidate my learnings, I decided to write this small document.
You can find the source code for this post here.
Note that the design is intentionally simple. It doesn’t consider important bits that should be used in a production context, like the use of Terraform cloud connected to a VCS; timeout and memory limits for the lambda function; monitoring and alerting; or encryption of data at rest.
Table of contents
Terraform - AWS Lambda via SQS
Architecture
The architecture I want to create with IaC (Infra as Code) is super simple: we have a Lambda function triggered by SQS (Simple Queue Service) messages. We log messages to CloudWatch Logs, just by writing their content on the standard output.
Despite its simplicity, this kind of architecture is at the core of high-performance and large-scale systems. For instance, the amazing BinaryAlert released by AirBnB uses the same principle to scan with Yara every single object uploaded in S3.
Why SQS?
One question I had when I started experimenting with server-less computing in AWS was to understand why it is common to see solutions using SQS instead of SNS (Simple Notification Service). Or, solutions directly connecting Lambda with other event sources (like events generated by S3).
The answer really depends on what you are trying to achieve.
For a scenario in which we want to minimize the probability of losing messages, one nice feature of SQS is the persistence: on well-designed systems, this allows an easier debugging and recover from outages and bugs pushed to production.
Going into details about these systems is beyond the scope of this post. Several resources are explaining the pros and cons of SQS vs SNS. Here are some links:
AWS user and permissions
First off, we need to create a service user for Terraform.
I am not going to explain this step in details, but one way of doing this is via the IAM service on the AWS web console.
For our scenario, this user should have the following permissions.
AmazonSQSFullAccess
IAMFullAccess
CloudWatchLogsFullAccess
AmazonSNSFullAccess
AWSLambda_FullAccess
Note that these permissions are too much for what we are doing. In a hypothetical production environment, we might want to be more restrictive and only add actions we actually need Terraform to perform.
Terraform
With the following snippets we build, piece by piece, our infra.
In the remainder of this post, I am going to explain how that code works.
Lambda function
The lambda function we use in this example is a toy function. It will receive messages from SQS and log their content in CloudWatch.
The only “interesting” bit is that we need to handle multiple records per event, as our queue could deliver multiple messages in batch.
For simplicity, I am going to use Python and create a single file named lambda.py
with our handler function.
#!/usr/bin/env python3
import json
print('Loading function test')
def lambda_handler(event, context) -> str:
print(f"Received event: {json.dumps(event, indent=2)}")
print(f"Received {len(event['Records'])} record in a single SQS message")
for i, record in enumerate(event['Records']):
print(f"{i:03} -> {record['body']}")
return None
While building our infra, AWS Lambda service expects us to deliver a zip file with our source code. With Terraform, we can zip the content of a directory and use the zip file to define our serverless function.
data "archive_file" "lambda_function_zip" {
type = "zip"
source_dir = "${path.module}/lambda-python/"
output_path = "${var.name_prefix}.zip"
}
resource "aws_lambda_function" "lambda_function" {
filename = "${var.name_prefix}.zip"
function_name = var.name_prefix
role = aws_iam_role.iam_for_lambda.arn
handler = "lambda.lambda_handler"
source_code_hash = data.archive_file.lambda_function_zip.output_base64sha256
runtime = "python3.9"
}
In the above snippet, there are 2 important things that took me more than it should have to grasp by reading various blog post on the Internet.
First, the value of handler
, must be <name of python file>.<name of function>
in our source above. Since we are using Python, this should have been obvious, but I found it confusing at first.
The second thing is about source_code_hash
. This is critical to make sure the zip file is installed every time our source code changes. For this, can use the b64 encoded sha256 of the zip file, as generated by the archive_file
data block above.
The value of the role
attribute will be explained later. In short, it defines what this lambda function can do (i.e.: logging + SQS message reception).
SQS Queue(s)
We are going to create 2 queues: a “main” one and a dead-letter queue (DLQ). The latter is useful for debugging purposes as it will contain messages that couldn’t be read and then deleted by the consumer.
resource "aws_sqs_queue" "dead_letter_queue" {
name = "lambda_sqs_dead_letter_queue"
message_retention_seconds = var.sqs_retention_seconds
}
resource "aws_sqs_queue" "lambda_queue" {
name = "lambda_queue"
message_retention_seconds = var.sqs_retention_seconds
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.dead_letter_queue.arn
maxReceiveCount = var.sqs_max_retry_count
})
redrive_allow_policy = jsonencode({
redrivePermission = "byQueue",
sourceQueueArns = [aws_sqs_queue.dead_letter_queue.arn]
})
}
Messages will end up in the DLQ after var.sqs_max_retry_count
attempts a consumer tried to receive a message from a queue, without deleting it. These messages will stay in the DLQ for var.sqs_retention_seconds
seconds, allowing recovery and debugging.
More info about DLQs in SQS can be found here.
Log Group
To log messages from our function, we need to create a LogGroup.
With Terraform, we can do the following:
resource "aws_cloudwatch_log_group" "test_cloudwatch_log_group" {
name = "/aws/lambda/${var.name_prefix}"
retention_in_days = var.log_retention_days
}
Connect SQS queue with lambda
Now we can connect the SQS queue (the event source) to our lambda. Doing so, we can specify the maximum batch size for message reception.
resource "aws_lambda_event_source_mapping" "lambda_via_sqs" {
batch_size = var.sqs_batch_size
event_source_arn = aws_sqs_queue.lambda_queue.arn
function_name = var.name_prefix
depends_on = [
aws_lambda_function.lambda_function
]
}
Roles and Policies
What we need to do now is to create a role for our lambda and attach policies to it to allow certain “actions”. Specifically, we have to authorize our function to log to CloudWatch, and receive/delete messages from our SQS queue.
As a matter of fact, this is the first thing that has to be defined while we build our infra. Terraform will automatically create elements in the “right order”.
A thing that I found a bit confusing as a n00b was the AssumeRole action. In short, when we create a role, we need to specify what entities can assume that role. In our case, lambda functions.
data "aws_iam_policy_document" "lambda_execution_policy_document" {
statement {
sid = "AllowLambdaToAssumeRole"
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["lambda.amazonaws.com"]
}
}
}
# Role for lambda
resource "aws_iam_role" "iam_for_lambda" {
name = "${var.name_prefix}_role"
# Grant lambda permission to assume the role "iam_for_lambda"
assume_role_policy = data.aws_iam_policy_document.lambda_execution_policy_document.json
}
We can now attach policies to the role. This is what we need to allow logging:
data "aws_iam_policy_document" "lambda_logging_policy" {
statement {
sid = "EnableLogs"
actions = [
"logs:CreateLogStream",
"logs:PutLogEvents",
]
resources = ["*"]
}
}
resource "aws_iam_role_policy" "lambda_logs_policy" {
name = "lambda_logs_policy"
role = aws_iam_role.iam_for_lambda.name
policy = data.aws_iam_policy_document.lambda_logging_policy.json
}
And this is needed to operate the SQS queue (receive):
data "aws_iam_policy_document" "lambda_sqs_policy_document" {
statement {
sid = "ProcessSQSMessages"
actions = [
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
]
resources = [aws_sqs_queue.lambda_queue.arn]
}
}
resource "aws_iam_role_policy" "lambda_sqs_policy" {
name = "lambda_sqs_policy"
role = aws_iam_role.iam_for_lambda.name
policy = data.aws_iam_policy_document.lambda_sqs_policy_document.json
}
Testing the system
If you just want to test the system with one message, you can run
aws sqs send-message \
--queue-url "$(terraform output --raw sqs_url)" \
--message-body "$(date) - sqs test"
To test the function with batch processing and concurrent execution (and also to give a sense of how to programmatically send messages to the queue), I created a super simple Python script. This script sends itself, line by line, to the SQS queue. It does that in batches of up to 10 lines.
#!/usr/bin/env python3
import logging
import sys
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
sqs = boto3.resource('sqs')
def send_messages(queue, messages):
"""
Send a batch of messages in a single request to an SQS queue.
This request may return overall success even when some messages were not
sent.
The caller must inspect the Successful and Failed lists in the response and
resend any failed messages.
:param queue: The queue to receive the messages.
:param messages: The messages to send to the queue. These are simplified to
contain only the message body and attributes.
:return: The response from SQS that contains the list of successful and
failed
messages.
"""
try:
entries = [{
'Id': str(ind),
'MessageBody': msg['body'],
'MessageAttributes': msg['attributes']
} for ind, msg in enumerate(messages)]
response = queue.send_messages(Entries=entries)
if 'Successful' in response:
for msg_meta in response['Successful']:
logger.info(
"Message sent: %s: %s",
msg_meta['MessageId'],
messages[int(msg_meta['Id'])]['body']
)
if 'Failed' in response:
for msg_meta in response['Failed']:
logger.warning(
"Failed to send: %s: %s",
msg_meta['MessageId'],
messages[int(msg_meta['Id'])]['body']
)
except ClientError as error:
logger.exception("Send messages failed to queue: %s", queue)
raise error
else:
return response
def get_queues(prefix=None):
"""
Gets a list of SQS queues. When a prefix is specified, only queues with
names that start with the prefix are returned.
:param prefix: The prefix used to restrict the list of returned queues.
:return: A list of Queue objects.
"""
if prefix:
queue_iter = sqs.queues.filter(QueueNamePrefix=prefix)
else:
queue_iter = sqs.queues.all()
queues = list(queue_iter)
if queues:
logger.info("Got queues: %s", ', '.join([q.url for q in queues]))
else:
logger.warning("No queues found.")
return queues
def test_sqs_lambda():
"""
Shows how to:
* Read the lines from this Python file and send the lines in
batches of 10 as messages to a queue.
* Receive the messages in batches until the queue is empty.
* Reassemble the lines of the file and verify they match the original file.
"""
def pack_message(msg_path, msg_body, msg_line):
return {
'body': msg_body,
'attributes': {
'path': {'StringValue': msg_path, 'DataType': 'String'},
'line': {'StringValue': str(msg_line), 'DataType': 'String'}
}
}
def unpack_message(msg):
return (msg.message_attributes['path']['StringValue'],
msg.body,
int(msg.message_attributes['line']['StringValue']))
queue = get_queues(prefix='lambda_queue')[0]
with open(__file__) as file:
lines = file.readlines()
line = 0
batch_size = 10
print(f"Sending file lines in batches of {batch_size} as messages.")
while line < len(lines):
messages = [
pack_message(__file__, lines[index], index)
for index in range(line, min(line + batch_size, len(lines)))
]
line = line + batch_size
send_messages(queue, messages)
print('.', end='')
sys.stdout.flush()
print(f"Done. Sent {len(lines) - 1} messages.")
if __name__ == '__main__':
test_sqs_lambda()
After running this script, you should be able to see messages in CloudWatch logs:
That’s all
Thanks for reading