Exporting CloudWatch Logs to S3

I had to figure out how to get logs from CloudWatch into S3. This task is actually pretty easy because AWS provides a very nice tutorial, Exporting log data to Amazon S3, that explains how to do this either via Console or CLI. My problem is that I needed to do this daily so automating this task was my next struggle. The AWS tutorial provides details on setting up S3 and IAM for this solution so I won’t cover that here. I also found a great article by Omar Dulaimi that was the basis for my code (why completely reinvent the wheel?). With both of these laying the ground work, I got right to putting this together.

High Level Details

The code is setup to run daily, so I have it use the following flow:

  1. Get the current date and time to be used as the end time for our export task
  2. Get “n” number of days ago date and time as the start time for our export task
  3. Create the Cloudwatch export to S3 task
  4. Check the status of the export
    1. If the task is still RUNNING, then sleep 1 second and check the status again
    2. If the task is still RUNNING, then sleep 2x the previous delay and check the status again
    3. Continue step 6 until we get a COMPLETED status from the task
    4. If we ever get a status other than COMPLETED or RUNNING (See the other possible status codes here), then raise an exception
  5. Once COMPLETED, return the export task id and the path in our S3 bucket.

The Code

I decided to put this together as a module in case I needed it for anything in the future.

import boto3
import os
import datetime
import logging
import time

"""
This portion will receive the n_days value (the date/day of the log you want
want to export) and calculate the start and end date of logs you want to
export to S3. Today = 0; yesterday = 1; so on and so forth...
Ex: If today is April 13th and NDAYS = 0, April 13th logs will be exported.
Ex: If today is April 13th and NDAYS = 1, April 12th logs will be exported.
Ex: If today is April 13th and NDAYS = 2, April 11th logs will be exported.
"""
def generate_date_dict(n_days = 1):
    currentTime = datetime.datetime.now()
    date_dict = {
        "start_date" : currentTime - datetime.timedelta(days=n_days),
        "end_date" : currentTime - datetime.timedelta(days=n_days - 1),
    }
    return date_dict

"""
Convert the from & to Dates to milliseconds
"""
def convert_from_date(date_time = datetime.datetime.now()):
    return int(date_time.timestamp() * 1000)


"""
The following will create the subfolders' structure based on year, month, day
Ex: BucketNAME/LogGroupName/Year/Month/Day
"""
def generate_prefix(s3_bucket_prefix, start_date):
    return os.path.join(s3_bucket_prefix, start_date.strftime('%Y{0}%m{0}%d').format(os.path.sep))


"""
Based on the AWS boto3 documentation
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.create_export_task
"""
def create_export_task(group_name, from_date, to_date, s3_bucket_name, s3_bucket_prefix):
    logging.info("Creating CloudWatch Log Export Task to S3")
    try:
        client = boto3.client('logs')
        response = client.create_export_task(
             logGroupName=group_name,
             fromTime=from_date,
             to=to_date,
             destination=s3_bucket_name,
             destinationPrefix=s3_bucket_prefix
            )
        if 'taskId' not in response:
            logging.error("Unexpected createExportTask response")
            raise Exception("Unexpected createExportTask response")
        return response['taskId']
    except Exception as e:
        logging.error(e)
        raise Exception("Failed to Create Export Task")

"""
Based on the AWS boto3 documentation
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.describe_export_tasks
"""
def export_status_call(task_id):
    try:
        client = boto3.client('logs')
        response = client.describe_export_tasks(
             taskId = task_id
            )

        if 'exportTasks' not in response:
            logging.error("Unexpected describeExportTasks response")
            raise Exception("Unexpected describeExportTasks response")

        if len(response['exportTasks']) != 1:
            logging.error("Wrong number of export tasks found")
            raise Exception("Wrong number of export tasks found")

        if "status" not in response['exportTasks'][0]:
            logging.error("No status found in describeExportTasks response")
            raise Exception("No status found in describeExportTasks response")

        if "code" not in response['exportTasks'][0]['status']:
            logging.error("No status code found in describeExportTasks response")
            raise Exception("No status code found in describeExportTasks response")

        if response['exportTasks'][0]['status']['code'] == 'RUNNING':
            return "RUNNING"

        if response['exportTasks'][0]['status']['code'] == 'COMPLETED':
            return "COMPLETED"

        # Otherwise we should be in a failed state
        logging.error("Task is in an unexpected state")
        raise Exception("Task is in an unexpected state")

    except Exception as e:
        logging.error(e)
        raise Exception("Status Call Failed")

def check_export_status(task_id, delay = 1):
    logging.info("Checking Status of Export Task")
    try:
        response = export_status_call(task_id = task_id)

        # We want to be sure we're not beating up the API and we'll increase our delay until the task completes
        while(response == 'RUNNING'):
            logging.warning("We did not get a COMPLETED response so waiting " + str(delay) + " seconds until checking status again")
            time.sleep(delay)
            response = export_status_call(task_id = task_id)
            delay = delay * 2

        logging.info("Log Export Task has completed")
        return True

    except Exception as e:
        logging.error(e)
        raise Exception("Describe Check Failed")

"""
The main function in here
"""
def run(group_name, s3_bucket_name, s3_bucket_prefix, n_days):
    try:
        """
        Based upon what we've been provided for n_days, we'll generate the dates needed to run
        """
        date_dict = generate_date_dict(n_days = n_days)
        date_dict['start_date_ms'] = convert_from_date(date_time = date_dict['start_date'])
        date_dict['end_date_ms'] = convert_from_date(date_time = date_dict['end_date'])
        s3_bucket_prefix_with_date = generate_prefix(s3_bucket_prefix = s3_bucket_prefix, start_date = date_dict['start_date'])
        export_task_id = create_export_task(group_name = group_name, from_date = date_dict['start_date_ms'], to_date = date_dict['end_date_ms'], s3_bucket_name = s3_bucket_name, s3_bucket_prefix = s3_bucket_prefix_with_date)
        logging.debug("Export Task ID : " + export_task_id)
        check_export_status(task_id = export_task_id)
        return {
          "task_id" : export_task_id,
          "s3_export_path" : s3_bucket_prefix_with_date
        }
    except Exception as e:
        logging.error(e)
        raise e

Running the Code

With this module in place in a modules directory on my machine, I created a quick main.py that I can use to demonstrate the execution of the code.

from modules import cloudwatch_to_s3
import logging
import os

"""
This portion will obtain the Environment variables
"""
GROUP_NAME = os.environ['GROUP_NAME']
DESTINATION_BUCKET = os.environ['DESTINATION_BUCKET']
PREFIX = os.environ['PREFIX']
NDAYS = os.environ['NDAYS']
n_days = int(NDAYS)

if __name__ == '__main__':
    try:
        response = cloudwatch_to_s3.run(group_name = GROUP_NAME, s3_bucket_name = DESTINATION_BUCKET, s3_bucket_prefix = PREFIX, n_days = n_days)
        print(response)
    except Exception as e:
        logging.error(e)
        print(e)

In the above main.py script, you’ll want to set the following params

GROUP_NAME The name of the CloudWatch Log Group you are going to export from
DESTINATION_BUCKET The name of the S3 bucket that you plan to export your logs into
PREFIX A directory name within the S3 bucket that you’d like to export into (This could be useful if you intend to export multiple systems into the S3 bucket. Just remember that AWS limits you to only one export task in a RUNNING at a time)
NDAYS Set this to how many days in the past you’d like to export for.
AWS_ACCESS_KEY_ID AWS access key associated with the IAM user created in the Exporting log data to Amazon S3 referenced above. (Optional if the User or Role was not assigned to the environment where your script is executing — Lambda/EC2/etc/….
AWS_SECRET_ACCESS_KEY AWS secret key associated with the access key for the IAM User created in the Exporting log data to Amazon S3 referenced above. (Optional if the User or Role was not assigned to the environment where your script is executing — Lambda/EC2/etc/….

Running the main.py results in the following output

# python main.py
WARNING:root:We did not get a COMPLETED response so waiting 1 seconds until checking status again
WARNING:root:We did not get a COMPLETED response so waiting 2 seconds until checking status again
WARNING:root:We did not get a COMPLETED response so waiting 4 seconds until checking status again
{'task_id': '75050e9d-99dc-487d-9233-93216dd993ae', 's3_export_path': 'server01/2022/09/25'}

You can now go to s3://<BUCKET_NAME>/<PREFIX>/<YYYY>/<MM>/<DD>/<TASK_ID>/ (In my case this would be s3://log_export_testing/server01/2022/09/25/75050e9d-99dc-487d-9233-93216dd993ae/) and see the exported contents of the log group. If the log group had multiple streams, each stream will have its own directory here with the logs gzipped under the directory.