Automating Anomaly Detection in AWS CloudTrail Logs

· 7 min read
cloudtrail bedrock slack
CloudTrail anomaly detection pipeline

In this guide, we explore the automation of anomaly detection in AWS CloudTrail logs using an AWS Lambda function, Amazon EventBridge, and AWS Bedrock. This solution provides near real-time security monitoring by analysing CloudTrail logs and notifying a Slack channel if any potentially malicious activity is detected.

Problem Statement

Monitoring AWS CloudTrail logs manually for suspicious activities is inefficient and error-prone. Security teams must continuously analyse logs for unauthorised access, privilege escalations, or other security threats.

To address this, the implemented solution automates security monitoring by:

This automation enhances AWS security posture by providing quick insights into potential threats with minimal manual intervention.

Key Features

EventBridge Rule Configuration

Lambda Function for Security Analysis

AI-Powered Threat Analysis

Slack Integration for Alerts

The Lambda function posts analysis results to a Slack channel.

Notes About the Solution

Architecture & Execution

Prerequisites

Error Handling & Security

Limitations

Installation Steps

To deploy this solution, follow these steps:

1. Configure EventBridge Rule

{
"source": ["aws.iam", "aws.ec2", "aws.s3", "aws.rds", "aws.signin", "aws.kms", "aws.secretsmanager", "aws.config", "aws.cloudtrail", "aws.budgets"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventName": ["CreateUser", "DeleteUser", "AttachUserPolicy", "DetachUserPolicy", "CreateAccessKey", "DeleteAccessKey", "UpdateAccessKey", "CreateRole", "DeleteRole", "AttachRolePolicy", "DetachRolePolicy", "AuthorizeSecurityGroupIngress", "AuthorizeSecurityGroupEgress", "RunInstances", "TerminateInstances", "DeleteNetworkAcl", "PutBucketAcl", "DeleteBucketAcl", "PutBucketPolicy", "DeleteBucketPolicy", "DeleteObject", "DeleteDBInstance", "ModifyDBInstance", "RebootDBInstance", "DeleteDBSnapshot", "DeleteSubnet", "DeleteKey", "DeleteSecret", "DeleteConfigRule", "DeleteTrail", "StopLogging", "DeleteBudget", "ConsoleLogin", "CreateDBCluster", "DeleteDBCluster", "ModifyDBCluster", "FailoverDBCluster"]
}
}

2. Configure EventBridge Rule

import boto3
bedrock_client = boto3.client('bedrock-runtime', region_name='eu-west-2')

3. Deploy the Lambda Function

import json
import boto3
import urllib3
import logging
from datetime import datetime, timedelta

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
analysis_result= None
slack_bot_token = "[Enter Slack Bot Token Here]"
channel_id = "[Enter Slack Channel ID Here]"

logger.info(f"Received event: {json.dumps(event)}")

event_details_text = f"""
Received event: {json.dumps(event)}")
"""

logger.info(f"Event details: {event_details_text}")

bedrock_client = boto3.client('bedrock-runtime', region_name='[Enter Your Region Name Here]') # Amazon Bedrock endpoint

prompt = f"""
You are a security expert reviewing AWS CloudTrail logs. Please analyze the following CloudTrail event and determine if there is any suspicious activity or potential threat. I will give you the event details as JSON so please analyze according to that.

Event Details:
{event_details_text}

Please before making your analyze give me necessary informations from the json file.

Based on the event details, please analyze whether this action represents a normal activity or a potential security threat. Provide a clear response in plain text.

Please keep it very simple, short and easy to read.

In the end choose one of these according to your conclusion and finish your text with it. If the log seems dangerous to you choose dangerous. If you think it is safe choose Safe. If you are not sure choose I am not sure. "Type: Dangerous", "Type: Safe", "Type: I am not sure".
"""

messages = [
{
"role": "user",
"content": prompt
}
]

logger.info(f"Sending prompt to Claude: {prompt}")
# Enter the model ID that you like
try:
response = bedrock_client.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps({
"messages": messages,
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2000
}),
contentType="application/json",
accept="application/json"
)

response_body = response['body'].read().decode('utf-8')
response_json = json.loads(response_body)

logger.info(f"Response from Bedrock: {json.dumps(response_json, indent=2)}")

analysis_result=response_json["content"][0]["text"]
if not analysis_result:
logger.warning("No analysis result found from Bedrock response.")
analysis_result = "No analysis result returned."

logger.info(f"Analysis result: {analysis_result}")

except Exception as e:
logger.error(f"Error invoking Bedrock or processing event: {e}")
return {
'statusCode': 500,
'body': json.dumps(f'Failed to process the event: {str(e)}')
}

color= None
if "Type: Dangerous" in analysis_result:
color="#c41616"
elif "Type: Safe" in analysis_result:
color="#25b04a"
elif "Type: I am not sure" in analysis_result:
color="#ede326"
else:
color="#000000"

message = {
"channel": channel_id,
"attachments": [
{
"text": f"""
Threat Analysis: {analysis_result}
""",
"color": color
}
]
}

logger.info(f"Sending message to Slack: {json.dumps(message, indent=2)}")

http = urllib3.PoolManager()

slack_api_url = "https://slack.com/api/chat.postMessage"

headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {slack_bot_token}"
}

try:
response = http.request(
"POST",
slack_api_url,
body=json.dumps(message),
headers=headers
)
# Log the response from Slack
logger.info(f"Slack response: {response.status}")
logger.info(f"Response body: {response.data.decode('utf-8')}")
except Exception as e:
logger.error(f"Error sending message to Slack: {e}")
return {
'statusCode': 500,
'body': json.dumps('Failed to send message to Slack.')
}

return {
'statusCode': 200,
'body': json.dumps('Message sent to Slack successfully!')
}

3. Set Up Slack Integration

4. Test and Monitor

{
"version": "0",
"id": "12345678-1234-1234-1234-123456789012",
"detail-type": "AWS API Call via CloudTrail",
"source": "aws.ec2",
"account": "123456789012",
"time": "2024-11-08T12:34:56Z",
"region": "us-east-1",
"resources": [],
"detail": {
"eventVersion": "1.05",
"userIdentity": {
"type": "IAMUser",
"principalId": "AIDAEXAMPLE",
"arn": "arn:aws:iam::123456789012:user/ExampleUser",
"accountId": "123456789012",
"accessKeyId": "AKIAEXAMPLE",
"userName": "ExampleUser"
},
"eventTime": "2024-11-08T12:34:56Z",
"eventSource": "ec2.amazonaws.com",
"eventName": "CreateKeyPair",
"awsRegion": "us-east-1",
"sourceIPAddress": "192.168.1.1",
"userAgent": "aws-sdk-java/1.11.1000 Linux/4.14.123-112.109.amzn2.x86_64",
"requestParameters": {
"keyName": "MyNewKeyPair"
},
"responseElements": {
"keyPair": {
"keyName": "MyNewKeyPair",
"keyFingerprint": "abcd1234"
}
},
"requestID": "1234abcd-1234-1234-1234-123456789012",
"eventID": "abcd1234-5678-1234-1234-123456789012",
"readOnly": false,
"resources": [],
"eventType": "AwsApiCall",
"managementEvent": true
}
}

5. Result

As a result you should see something like the screenshot below:

Automating Anomaly Detection in AWS CloudTrail Logs — figure

By implementing this solution, you have achieved near real-time, AI-powered security monitoring for AWS CloudTrail logs, enabling rapid threat detection. 🎉🎉🎉

Further Information

For more details, consult the following AWS documentation: