import boto3 import json import time import os latest_version = os.environ["ec2_launch_v2"] ec2_launch_sns_topic = os.environ["ec2_launch_sns_topic"] def get_ec2_instances(instance_id, command_id): command_list = [] final_res_list = [] result_list = [] print("entered into get ec2 instances") ssm_client = boto3.client('ssm') response = ssm_client.describe_instance_information() print("complete ec2 instance response", response) instances = response.get('InstanceInformationList', []) platformtype = [item.get('PlatformType') for item in instances] print("platformtype", platformtype) instance_ids = [item.get('InstanceId') for item in instances] print("Instance id's list", instance_ids) print("list of instances", instances) for item in instances: if 'PlatformType' in item: print("Platform Type:", item['PlatformType']) print("Instance id's list", instance_ids) result_item = {'InstanceId': item['InstanceId'], 'PlatformType': item['PlatformType']} result_list.append(result_item) print("ec2 instance id and platform type", result_list) else: print("No Platform Type found for this item") print("ec2 instance id and platform type", result_list) windows_instances = [instance for instance in result_list if instance['PlatformType'] == 'Windows'] print("windows instances", windows_instances) windows_instance_dict = {} if not windows_instances: print("No Windows instances found.") else: print("Windows Instances:") print(windows_instances) windows_instance_ids = [instance['InstanceId'] for instance in windows_instances] print("windows instance id's", windows_instance_ids) for instance_id in windows_instance_ids: print("windows instances in loop", instance_id) document_name = 'Abrigo-EC2LaunchV2Info' response = run_ssm_command(instance_id, document_name) command_list.append(response) final_res = get_command_invocation(instance_id, response) # final_res_list.append(final_res) windows_instance_dict[instance_id] = final_res print("final_res EC2LaunchV2Info installation", final_res) print("&&&&&&&", command_list) print("windows instance id's and command id's", windows_instance_dict) # AWS SNS configurations sns_topic_arn = os.environ["ec2_launch_sns_topic"] email_subject = 'EC2LaunchV2Info installation details' email_body = f"Please find the below details for EC2LaunchV2Info installation on EC2 instances.\n\n" if len(windows_instance_dict) > 0: email_body += f"Windows Instances List :\n{json.dumps(windows_instance_dict, indent=2)}\n\n" if len(windows_instance_dict) == 0: email_body += f"No Instances found with SSM enabled" print("email_body type", type(email_body)) # Create an SNS client sns_client = boto3.client('sns', region_name='us-east-1') # Publish the email message to the SNS topic response = sns_client.publish( TopicArn=sns_topic_arn, Subject=email_subject, Message=email_body ) print("Message published successfully. MessageId:", response['MessageId']) return windows_instance_dict, response['MessageId'] def run_ssm_command(instance_id, document_name, parameters=None): print("entered into function run ssm command") """ Run an SSM command on a specific EC2 instance. :param instance_id: The ID of the target EC2 instance. :param document_name: The name of the SSM document to execute. :param parameters: Optional parameters for the SSM command. :return: The command execution ID. """ ssm_client = boto3.client('ssm') print("entered into boto3 ssm command") # Build the command parameters command_params = { 'DocumentName': document_name, 'Targets': [ { 'Key': 'instanceids', 'Values': [instance_id] } ] } print("entered into command_params ssm command") if parameters: print("inside parameters") command_params['Parameters'] = parameters # Execute the SSM command print("entering into ssm send command") response = ssm_client.send_command(**command_params) print("printing ssm-run-response", response) # Get the command execution ID command_id = response['Command']['CommandId'] # instance_id = instance_id # Replace with your instance ID document_name = 'ssmdocument' # Replace with your SSM document name # parameters = {'param_name': ['param_value']} # Replace with your SSM command parameters if any # command_id = run_ssm_command(instance_id, document_name) print(f"SSM Command executed successfully. Command ID: {command_id}") return command_id def get_command_invocation(instance_id, command_id): print("entered into get command invocation function") print("command id", command_id) ssm_client = boto3.client('ssm') try: response = ssm_client.get_command_invocation( InstanceId=instance_id, CommandId=command_id ) print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!response!!!!!!!!!!!!!!!!!!!!!!!!", response) final_response = response ec2_instance_id = instance_id standard_output_content = final_response.get('StandardOutputContent', '') # Check conditions based on StandardOutputContent if latest_version in standard_output_content: compliance_type = 'COMPLIANT' print(f"EC2Launch V2 is installed and with version {latest_version} and compliance status: {compliance_type}") return f"EC2LaunchV2 is installed with version {latest_version} and compliance status: {compliance_type}" else: compliance_type = 'NON_COMPLIANT' print(f"$$$EC2Launch V2 is not installed. compliance status is: {compliance_type} ; current verison : {standard_output_content} Expected version: {latest_version} $$$") return f"EC2Launch V2 is not installed. compliance status is: {compliance_type} {standard_output_content} Expected version: {latest_version} " except Exception as e: print(f"Error retrieving command invocation details: {e}") return None if __name__ == "__main__": get_ec2_instances(" ", " ")
Standard input is empty
import boto3 import json import time import os latest_version = os.environ["ec2_launch_v2"] ec2_launch_sns_topic = os.environ["ec2_launch_sns_topic"] def get_ec2_instances(instance_id, command_id): command_list = [] final_res_list = [] result_list = [] print("entered into get ec2 instances") ssm_client = boto3.client('ssm') response = ssm_client.describe_instance_information() print("complete ec2 instance response", response) instances = response.get('InstanceInformationList', []) platformtype = [item.get('PlatformType') for item in instances] print("platformtype", platformtype) instance_ids = [item.get('InstanceId') for item in instances] print("Instance id's list", instance_ids) print("list of instances", instances) for item in instances: if 'PlatformType' in item: print("Platform Type:", item['PlatformType']) print("Instance id's list", instance_ids) result_item = {'InstanceId': item['InstanceId'], 'PlatformType': item['PlatformType']} result_list.append(result_item) print("ec2 instance id and platform type", result_list) else: print("No Platform Type found for this item") print("ec2 instance id and platform type", result_list) windows_instances = [instance for instance in result_list if instance['PlatformType'] == 'Windows'] print("windows instances", windows_instances) windows_instance_dict = {} if not windows_instances: print("No Windows instances found.") else: print("Windows Instances:") print(windows_instances) windows_instance_ids = [instance['InstanceId'] for instance in windows_instances] print("windows instance id's", windows_instance_ids) for instance_id in windows_instance_ids: print("windows instances in loop", instance_id) document_name = 'Abrigo-EC2LaunchV2Info' response = run_ssm_command(instance_id, document_name) command_list.append(response) time.sleep(10) final_res = get_command_invocation(instance_id, response) # final_res_list.append(final_res) windows_instance_dict[instance_id] = final_res print("final_res EC2LaunchV2Info installation", final_res) print("&&&&&&&", command_list) print("windows instance id's and command id's", windows_instance_dict) # AWS SNS configurations sns_topic_arn = os.environ["ec2_launch_sns_topic"] email_subject = 'EC2LaunchV2Info installation details' email_body = f"Please find the below details for EC2LaunchV2Info installation on EC2 instances.\n\n" if len(windows_instance_dict) > 0: email_body += f"Windows Instances List :\n{json.dumps(windows_instance_dict, indent=2)}\n\n" if len(windows_instance_dict) == 0: email_body += f"No Instances found with SSM enabled" print("email_body type", type(email_body)) # Create an SNS client sns_client = boto3.client('sns', region_name='us-east-1') # Publish the email message to the SNS topic response = sns_client.publish( TopicArn=sns_topic_arn, Subject=email_subject, Message=email_body ) print("Message published successfully. MessageId:", response['MessageId']) return windows_instance_dict, response['MessageId'] def run_ssm_command(instance_id, document_name, parameters=None): print("entered into function run ssm command") """ Run an SSM command on a specific EC2 instance. :param instance_id: The ID of the target EC2 instance. :param document_name: The name of the SSM document to execute. :param parameters: Optional parameters for the SSM command. :return: The command execution ID. """ ssm_client = boto3.client('ssm') print("entered into boto3 ssm command") # Build the command parameters command_params = { 'DocumentName': document_name, 'Targets': [ { 'Key': 'instanceids', 'Values': [instance_id] } ] } print("entered into command_params ssm command") if parameters: print("inside parameters") command_params['Parameters'] = parameters # Execute the SSM command print("entering into ssm send command") response = ssm_client.send_command(**command_params) print("printing ssm-run-response", response) # Get the command execution ID command_id = response['Command']['CommandId'] # instance_id = instance_id # Replace with your instance ID document_name = 'ssmdocument' # Replace with your SSM document name # parameters = {'param_name': ['param_value']} # Replace with your SSM command parameters if any # command_id = run_ssm_command(instance_id, document_name) print(f"SSM Command executed successfully. Command ID: {command_id}") return command_id def get_command_invocation(instance_id, command_id): print("entered into get command invocation function") print("command id", command_id) ssm_client = boto3.client('ssm') try: response = ssm_client.get_command_invocation( InstanceId=instance_id, CommandId=command_id ) print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!response!!!!!!!!!!!!!!!!!!!!!!!!", response) final_response = response ec2_instance_id = instance_id standard_output_content = final_response.get('StandardOutputContent', '') # Check conditions based on StandardOutputContent if latest_version in standard_output_content: compliance_type = 'COMPLIANT' print(f"EC2Launch V2 is installed and with version {latest_version} and compliance status: {compliance_type}") return f"EC2LaunchV2 is installed with version {latest_version} and compliance status: {compliance_type}" else: compliance_type = 'NON_COMPLIANT' print(f"$$$EC2Launch V2 is not installed. compliance status is: {compliance_type} ; current verison : {standard_output_content} Expected version: {latest_version} $$$") return f"EC2Launch V2 is not installed. compliance status is: {compliance_type} {standard_output_content} Expected version: {latest_version} " except Exception as e: print(f"Error retrieving command invocation details: {e}") return None if __name__ == "__main__": get_ec2_instances(" ", " ")