CloudFront Logs to OpenSearch via AWS Lambda

· 6 min read
aws lambda
CloudFront logs to OpenSearch pipeline

Overview

This guide will walk you through setting up an AWS Lambda function that automatically sends CloudFront access logs (stored in S3) to an OpenSearch index, allowing you to query and analyze CDN traffic in real time.

What You’ll Need

Before you begin, make sure you have:

An AWS account with:

AWS CLI installed

Basic familiarity with Lambda and IAM

High-Level Flow

  1. CloudFront sends logs to S3.
  2. An S3 event triggers an SNS notification.
  3. SNS invokes a Lambda function.
  4. Lambda:

Step-by-Step Implementation

1. Enable CloudFront Logging

Logs will be stored in GZIP format and will arrive a few minutes after requests.

2. Set Up the S3 Bucket

CloudFront Logs to OpenSearch via AWS Lambda — figure
CloudFront Logs to OpenSearch via AWS Lambda — figure

3. Create an SNS Topic

CloudFront Logs to OpenSearch via AWS Lambda — figure
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": "SNS:Publish",
"Resource": "arn:aws:sns:[Your SNS ARN]"
}
]
}

4. Create the Lambda Function

You’ll use the provided Python script (which handles decompression, parsing, and uploading to OpenSearch).

lambda_function.py :

import json
import gzip
import boto3
import base64
import logging
from datetime import datetime
from urllib.parse import unquote
import urllib3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.client('s3')

def lambda_handler(event, context):
logger.info("Lambda function invoked")
logger.info(f"Received event: {json.dumps(event)}")

try:
sns_message = event['Records'][0]['Sns']['Message']
s3_event = json.loads(sns_message)

bucket = s3_event['Records'][0]['s3']['bucket']['name']
key = unquote(s3_event['Records'][0]['s3']['object']['key'])
logger.info(f"Processing file: {key} from bucket: {bucket}")

response = s3.get_object(Bucket=bucket, Key=key)
logger.info("Successfully retrieved object from S3")

gzipped_content = response['Body'].read()
log_content = gzip.decompress(gzipped_content).decode('utf-8')
logger.info("Log file decompressed and decoded")

parsed_logs = parse_cloudfront_logs(log_content)
logger.info(f"Parsed {len(parsed_logs)} log entries")

send_to_opensearch(parsed_logs, key)
logger.info("Logs sent to OpenSearch")

return {
'statusCode': 200,
'body': json.dumps(f'Successfully processed {key}')
}
except Exception as e:
logger.error(f"Error processing file: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps(f'Error processing file: {str(e)}')
}

def parse_cloudfront_logs(log_content):
logger.info("Parsing CloudFront logs")
log_lines = [line for line in log_content.split('\n') if line and not line.startswith('#')]
headers = [
'date', 'time', 'x-edge-location', 'sc-bytes', 'c-ip', 'cs-method',
'cs(Host)', 'cs-uri-stem', 'sc-status', 'cs(Referer)', 'cs(User-Agent)',
'cs-uri-query', 'cs(Cookie)', 'x-edge-result-type', 'x-edge-request-id',
'x-host-header', 'cs-protocol', 'cs-bytes', 'time-taken', 'x-forwarded-for',
'ssl-protocol', 'ssl-cipher', 'x-edge-response-result-type',
'cs-protocol-version', 'fle-status', 'fle-encrypted-fields', 'c-port',
'time-to-first-byte', 'x-edge-detailed-result-type', 'sc-content-type',
'sc-content-len', 'sc-range-start', 'sc-range-end'
]
parsed_logs = []
for idx, line in enumerate(log_lines):
fields = line.split('\t')
log_dict = dict(zip(headers, fields))
logger.debug(f"Parsing log line {idx+1}")
nginx_log = convert_to_nginx_format(log_dict)
parsed_logs.append(nginx_log)
return parsed_logs

def convert_to_nginx_format(log_dict):
timestamp = f"{log_dict['date']}T00:00:00Z"
user_agent = unquote(log_dict['cs(User-Agent)'])
nginx_log = {
'@timestamp': timestamp,
'remote_addr': log_dict['c-ip'],
'remote_user': '-',
'request': f"{log_dict['cs-method']} {log_dict['cs-uri-stem']} {log_dict['cs-protocol-version']}",
'status': log_dict['sc-status'],
'body_bytes_sent': log_dict['sc-bytes'],
'http_referer': log_dict['cs(Referer)'] if log_dict['cs(Referer)'] != '-' else '',
'http_user_agent': user_agent,
'host': log_dict['cs(Host)'],
'request_time': float(log_dict['time-taken']),
'upstream_response_time': '-',
'upstream_status': '-'
}
logger.debug(f"Converted log to NGINX format: {nginx_log}")
return nginx_log

def send_to_opensearch(parsed_logs, log_key):
logger.info("Preparing to send logs to OpenSearch")
opensearch_endpoint = '[YOUR VPC ENDPOINT]'

current_date = datetime.now().strftime('%Y-%m-%d')
index_name = f'cloudfront-logs-{current_date}'.lower()

bulk_body = []
for log in parsed_logs:
bulk_body.append(json.dumps({
'index': {
'_index': index_name,
'_id': base64.b64encode(f"{log_key}_{log['@timestamp']}".encode()).decode()
}
}))
bulk_body.append(json.dumps(log))

bulk_payload = '\n'.join(bulk_body) + '\n'

try:
http = urllib3.PoolManager()
response = http.request(
'POST',
f"{opensearch_endpoint}/_bulk",
body=bulk_payload.encode('utf-8'),
headers={'Content-Type': 'application/x-ndjson'}
)
logger.info(f"OpenSearch response status: {response.status}")
logger.info(f"OpenSearch response data: {response.data}")
if response.status != 200:
logger.error(f"Error sending to OpenSearch: {response.data}")
except Exception as e:
logger.error(f"Exception sending to OpenSearch: {str(e)}")

Include Dependencies (urllib3)

Because Lambda does not come with urllib3 by default:

mkdir python

pip install urllib3 -t python/
zip -r9 function.zip python/

Then add your lambda_function.py (your code file) to the zip:

zip -g function.zip lambda_function.py

Upload this ZIP when creating your Lambda function.

After adding the lambda function as trigger to SNS you should see SNS in the overview like below:

CloudFront Logs to OpenSearch via AWS Lambda — figure

5. Set Environment and Permissions

IAM Role should include:

Please limit the resource according to your requirements

7. Connect the SNS Topic to Lambda

Then confirm the subscription.

8. Test the Flow

{
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:eu-central-1:123456789012:YourSNSTopic:unique-subscription-id",
"Sns": {
"Type": "Notification",
"MessageId": "dummy-message-id-1234-abcd-5678-efgh-ijklmnopqrs",
"TopicArn": "arn:aws:sns:eu-central-1:123456789012:YourSNSTopic",
"Subject": "Amazon S3 Notification",
"Message": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"eu-central-1\",\"eventTime\":\"2025-03-28T13:09:25.023Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:YOUR-PRINCIPAL-ID\"},\"requestParameters\":{\"sourceIPAddress\":\"72.21.217.31\"},\"responseElements\":{\"x-amz-request-id\":\"dummy-request-id\",\"x-amz-id-2\":\"dummy-amz-id\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"S3ToSNS\",\"bucket\":{\"name\":\"your-bucket-name\",\"ownerIdentity\":{\"principalId\":\"dummy-owner-id\"},\"arn\":\"arn:aws:s3:::your-bucket-name\"},\"object\":{\"key\":\"your-log-file.gz\",\"size\":123456,\"eTag\":\"dummy-etag\",\"sequencer\":\"dummy-sequencer-id\"}}}]}",
"Timestamp": "2025-03-28T13:09:25.574Z",
"SignatureVersion": "1",
"Signature": "dummy-signature-value",
"SigningCertUrl": "https://sns.eu-central-1.amazonaws.com/SimpleNotificationService-dummy-cert.pem",
"UnsubscribeUrl": "https://sns.eu-central-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-central-1:123456789012:YourSNSTopic:unique-subscription-id",
"MessageAttributes": {}
}
}
]
}

Verifying OpenSearch Index

Each day’s logs will be indexed into:

cloudfront-logs-YYYY-MM-DD

You can verify via Kibana/OpenSearch Dashboards:

You should see your indices in OpenSearch,

Under Index Management → Indices

You can create an index from there with the name of cloudfron:t-logs-*

CloudFront Logs to OpenSearch via AWS Lambda — figure

After creating the index you should see your logs under Discover:

CloudFront Logs to OpenSearch via AWS Lambda — figure
CloudFront Logs to OpenSearch via AWS Lambda — figure

Notes & Troubleshooting

Resources

Conclusion

With this setup, you’ve created a fully serverless pipeline to collect, parse, and index CloudFront logs into OpenSearch enabling real-time insights into your CDN traffic. It’s scalable, cost-effective, and easy to build upon.