I had to figure out how to get logs from CloudWatch into S3. This task is actually pretty easy because AWS provides a very nice tutorial, Exporting log data to Amazon S3, that explains how to do this either via Console or CLI. My problem is that I needed to do this daily so automating this task was my next struggle. The AWS tutorial provides details on setting up S3 and IAM for this solution so I won’t cover that here. I also found a great article by Omar Dulaimi that was the basis for my code (why completely reinvent the wheel?). With both of these laying the ground work, I got right to putting this together.
High Level Details
The code is setup to run daily, so I have it use the following flow:
- Get the current date and time to be used as the end time for our export task
- Get “n” number of days ago date and time as the start time for our export task
- Create the Cloudwatch export to S3 task
- Check the status of the export
- If the task is still
RUNNING
, then sleep 1 second and check the status again - If the task is still
RUNNING
, then sleep 2x the previous delay and check the status again - Continue
step 6
until we get aCOMPLETED
status from the task - If we ever get a status other than
COMPLETED
orRUNNING
(See the other possible status codes here), then raise an exception
- If the task is still
- Once
COMPLETED
, return the export task id and the path in our S3 bucket.
The Code
I decided to put this together as a module in case I needed it for anything in the future.
import boto3
import os
import datetime
import logging
import time
"""
This portion will receive the n_days value (the date/day of the log you want
want to export) and calculate the start and end date of logs you want to
export to S3. Today = 0; yesterday = 1; so on and so forth...
Ex: If today is April 13th and NDAYS = 0, April 13th logs will be exported.
Ex: If today is April 13th and NDAYS = 1, April 12th logs will be exported.
Ex: If today is April 13th and NDAYS = 2, April 11th logs will be exported.
"""
def generate_date_dict(n_days = 1):
currentTime = datetime.datetime.now()
date_dict = {
"start_date" : currentTime - datetime.timedelta(days=n_days),
"end_date" : currentTime - datetime.timedelta(days=n_days - 1),
}
return date_dict
"""
Convert the from & to Dates to milliseconds
"""
def convert_from_date(date_time = datetime.datetime.now()):
return int(date_time.timestamp() * 1000)
"""
The following will create the subfolders' structure based on year, month, day
Ex: BucketNAME/LogGroupName/Year/Month/Day
"""
def generate_prefix(s3_bucket_prefix, start_date):
return os.path.join(s3_bucket_prefix, start_date.strftime('%Y{0}%m{0}%d').format(os.path.sep))
"""
Based on the AWS boto3 documentation
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.create_export_task
"""
def create_export_task(group_name, from_date, to_date, s3_bucket_name, s3_bucket_prefix):
logging.info("Creating CloudWatch Log Export Task to S3")
try:
client = boto3.client('logs')
response = client.create_export_task(
logGroupName=group_name,
fromTime=from_date,
to=to_date,
destination=s3_bucket_name,
destinationPrefix=s3_bucket_prefix
)
if 'taskId' not in response:
logging.error("Unexpected createExportTask response")
raise Exception("Unexpected createExportTask response")
return response['taskId']
except Exception as e:
logging.error(e)
raise Exception("Failed to Create Export Task")
"""
Based on the AWS boto3 documentation
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.describe_export_tasks
"""
def export_status_call(task_id):
try:
client = boto3.client('logs')
response = client.describe_export_tasks(
taskId = task_id
)
if 'exportTasks' not in response:
logging.error("Unexpected describeExportTasks response")
raise Exception("Unexpected describeExportTasks response")
if len(response['exportTasks']) != 1:
logging.error("Wrong number of export tasks found")
raise Exception("Wrong number of export tasks found")
if "status" not in response['exportTasks'][0]:
logging.error("No status found in describeExportTasks response")
raise Exception("No status found in describeExportTasks response")
if "code" not in response['exportTasks'][0]['status']:
logging.error("No status code found in describeExportTasks response")
raise Exception("No status code found in describeExportTasks response")
if response['exportTasks'][0]['status']['code'] == 'RUNNING':
return "RUNNING"
if response['exportTasks'][0]['status']['code'] == 'COMPLETED':
return "COMPLETED"
# Otherwise we should be in a failed state
logging.error("Task is in an unexpected state")
raise Exception("Task is in an unexpected state")
except Exception as e:
logging.error(e)
raise Exception("Status Call Failed")
def check_export_status(task_id, delay = 1):
logging.info("Checking Status of Export Task")
try:
response = export_status_call(task_id = task_id)
# We want to be sure we're not beating up the API and we'll increase our delay until the task completes
while(response == 'RUNNING'):
logging.warning("We did not get a COMPLETED response so waiting " + str(delay) + " seconds until checking status again")
time.sleep(delay)
response = export_status_call(task_id = task_id)
delay = delay * 2
logging.info("Log Export Task has completed")
return True
except Exception as e:
logging.error(e)
raise Exception("Describe Check Failed")
"""
The main function in here
"""
def run(group_name, s3_bucket_name, s3_bucket_prefix, n_days):
try:
"""
Based upon what we've been provided for n_days, we'll generate the dates needed to run
"""
date_dict = generate_date_dict(n_days = n_days)
date_dict['start_date_ms'] = convert_from_date(date_time = date_dict['start_date'])
date_dict['end_date_ms'] = convert_from_date(date_time = date_dict['end_date'])
s3_bucket_prefix_with_date = generate_prefix(s3_bucket_prefix = s3_bucket_prefix, start_date = date_dict['start_date'])
export_task_id = create_export_task(group_name = group_name, from_date = date_dict['start_date_ms'], to_date = date_dict['end_date_ms'], s3_bucket_name = s3_bucket_name, s3_bucket_prefix = s3_bucket_prefix_with_date)
logging.debug("Export Task ID : " + export_task_id)
check_export_status(task_id = export_task_id)
return {
"task_id" : export_task_id,
"s3_export_path" : s3_bucket_prefix_with_date
}
except Exception as e:
logging.error(e)
raise e
Running the Code
With this module in place in a modules
directory on my machine, I created a quick main.py
that I can use to demonstrate the execution of the code.
from modules import cloudwatch_to_s3
import logging
import os
"""
This portion will obtain the Environment variables
"""
GROUP_NAME = os.environ['GROUP_NAME']
DESTINATION_BUCKET = os.environ['DESTINATION_BUCKET']
PREFIX = os.environ['PREFIX']
NDAYS = os.environ['NDAYS']
n_days = int(NDAYS)
if __name__ == '__main__':
try:
response = cloudwatch_to_s3.run(group_name = GROUP_NAME, s3_bucket_name = DESTINATION_BUCKET, s3_bucket_prefix = PREFIX, n_days = n_days)
print(response)
except Exception as e:
logging.error(e)
print(e)
In the above main.py
script, you’ll want to set the following params
# python main.py
WARNING:root:We did not get a COMPLETED response so waiting 1 seconds until checking status again
WARNING:root:We did not get a COMPLETED response so waiting 2 seconds until checking status again
WARNING:root:We did not get a COMPLETED response so waiting 4 seconds until checking status again
{'task_id': '75050e9d-99dc-487d-9233-93216dd993ae', 's3_export_path': 'server01/2022/09/25'}
You can now go to s3://<BUCKET_NAME>/<PREFIX>/<YYYY>/<MM>/<DD>/<TASK_ID>/
(In my case this would be s3://log_export_testing/server01/2022/09/25/75050e9d-99dc-487d-9233-93216dd993ae/
) and see the exported contents of the log group. If the log group had multiple streams, each stream will have its own directory here with the logs gzipped under the directory.