fork download
  1. import boto3
  2. import json
  3. import time
  4. import os
  5.  
  6. latest_version = os.environ["ec2_launch_v2"]
  7. ec2_launch_sns_topic = os.environ["ec2_launch_sns_topic"]
  8.  
  9. def get_ec2_instances(instance_id, command_id):
  10. command_list = []
  11. final_res_list = []
  12. result_list = []
  13. print("entered into get ec2 instances")
  14. ssm_client = boto3.client('ssm')
  15. response = ssm_client.describe_instance_information()
  16. print("complete ec2 instance response", response)
  17. instances = response.get('InstanceInformationList', [])
  18. platformtype = [item.get('PlatformType') for item in instances]
  19. print("platformtype", platformtype)
  20. instance_ids = [item.get('InstanceId') for item in instances]
  21. print("Instance id's list", instance_ids)
  22. print("list of instances", instances)
  23. for item in instances:
  24. if 'PlatformType' in item:
  25. print("Platform Type:", item['PlatformType'])
  26. print("Instance id's list", instance_ids)
  27. result_item = {'InstanceId': item['InstanceId'], 'PlatformType': item['PlatformType']}
  28. result_list.append(result_item)
  29. print("ec2 instance id and platform type", result_list)
  30. else:
  31. print("No Platform Type found for this item")
  32. print("ec2 instance id and platform type", result_list)
  33.  
  34. windows_instances = [instance for instance in result_list if instance['PlatformType'] == 'Windows']
  35.  
  36. print("windows instances", windows_instances)
  37. windows_instance_dict = {}
  38. if not windows_instances:
  39. print("No Windows instances found.")
  40. else:
  41. print("Windows Instances:")
  42. print(windows_instances)
  43. windows_instance_ids = [instance['InstanceId'] for instance in windows_instances]
  44. print("windows instance id's", windows_instance_ids)
  45.  
  46. for instance_id in windows_instance_ids:
  47. print("windows instances in loop", instance_id)
  48. document_name = 'Abrigo-EC2LaunchV2Info'
  49. response = run_ssm_command(instance_id, document_name)
  50. command_list.append(response)
  51. time.sleep(10)
  52. final_res = get_command_invocation(instance_id, response)
  53. # final_res_list.append(final_res)
  54. windows_instance_dict[instance_id] = final_res
  55. print("final_res EC2LaunchV2Info installation", final_res)
  56. print("&&&&&&&", command_list)
  57. print("windows instance id's and command id's", windows_instance_dict)
  58.  
  59. # AWS SNS configurations
  60. sns_topic_arn = os.environ["ec2_launch_sns_topic"]
  61. email_subject = 'EC2LaunchV2Info installation details'
  62. email_body = f"Please find the below details for EC2LaunchV2Info installation on EC2 instances.\n\n"
  63. if len(windows_instance_dict) > 0:
  64. email_body += f"Windows Instances List :\n{json.dumps(windows_instance_dict, indent=2)}\n\n"
  65.  
  66. if len(windows_instance_dict) == 0:
  67. email_body += f"No Instances found with SSM enabled"
  68. print("email_body type", type(email_body))
  69.  
  70. # Create an SNS client
  71. sns_client = boto3.client('sns', region_name='us-east-1')
  72.  
  73. # Publish the email message to the SNS topic
  74. response = sns_client.publish(
  75. TopicArn=sns_topic_arn,
  76. Subject=email_subject,
  77. Message=email_body
  78. )
  79.  
  80. print("Message published successfully. MessageId:", response['MessageId'])
  81. return windows_instance_dict, response['MessageId']
  82.  
  83.  
  84. def run_ssm_command(instance_id, document_name, parameters=None):
  85. print("entered into function run ssm command")
  86. """
  87. Run an SSM command on a specific EC2 instance.
  88.  
  89. :param instance_id: The ID of the target EC2 instance.
  90. :param document_name: The name of the SSM document to execute.
  91. :param parameters: Optional parameters for the SSM command.
  92. :return: The command execution ID.
  93. """
  94. ssm_client = boto3.client('ssm')
  95. print("entered into boto3 ssm command")
  96. # Build the command parameters
  97. command_params = {
  98. 'DocumentName': document_name,
  99. 'Targets': [
  100. {
  101. 'Key': 'instanceids',
  102. 'Values': [instance_id]
  103. }
  104. ]
  105. }
  106. print("entered into command_params ssm command")
  107. if parameters:
  108. print("inside parameters")
  109. command_params['Parameters'] = parameters
  110.  
  111. # Execute the SSM command
  112. print("entering into ssm send command")
  113. response = ssm_client.send_command(**command_params)
  114. print("printing ssm-run-response", response)
  115.  
  116. # Get the command execution ID
  117. command_id = response['Command']['CommandId']
  118.  
  119. # instance_id = instance_id # Replace with your instance ID
  120. document_name = 'ssmdocument' # Replace with your SSM document name
  121. # parameters = {'param_name': ['param_value']} # Replace with your SSM command parameters if any
  122.  
  123. # command_id = run_ssm_command(instance_id, document_name)
  124.  
  125. print(f"SSM Command executed successfully. Command ID: {command_id}")
  126. return command_id
  127.  
  128.  
  129. def get_command_invocation(instance_id, command_id):
  130. print("entered into get command invocation function")
  131. print("command id", command_id)
  132. ssm_client = boto3.client('ssm')
  133.  
  134. try:
  135. response = ssm_client.get_command_invocation(
  136. InstanceId=instance_id,
  137. CommandId=command_id
  138. )
  139.  
  140. print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!response!!!!!!!!!!!!!!!!!!!!!!!!", response)
  141. final_response = response
  142. ec2_instance_id = instance_id
  143. standard_output_content = final_response.get('StandardOutputContent', '')
  144.  
  145. # Check conditions based on StandardOutputContent
  146. if latest_version in standard_output_content:
  147. compliance_type = 'COMPLIANT'
  148. print(f"EC2Launch V2 is installed and with version {latest_version} and compliance status: {compliance_type}")
  149. return f"EC2LaunchV2 is installed with version {latest_version} and compliance status: {compliance_type}"
  150. else:
  151. compliance_type = 'NON_COMPLIANT'
  152. print(f"$$$EC2Launch V2 is not installed. compliance status is: {compliance_type} ; current verison : {standard_output_content} Expected version: {latest_version} $$$")
  153. return f"EC2Launch V2 is not installed. compliance status is: {compliance_type} {standard_output_content} Expected version: {latest_version} "
  154.  
  155. except Exception as e:
  156. print(f"Error retrieving command invocation details: {e}")
  157. return None
  158.  
  159. if __name__ == "__main__":
  160. get_ec2_instances(" ", " ")
Success #stdin #stdout 0.03s 25784KB
stdin
Standard input is empty
stdout
import boto3
import json
import time
import os

latest_version = os.environ["ec2_launch_v2"]
ec2_launch_sns_topic = os.environ["ec2_launch_sns_topic"]

def get_ec2_instances(instance_id, command_id):
    command_list = []
    final_res_list = []
    result_list = []
    print("entered into get ec2 instances")
    ssm_client = boto3.client('ssm')
    response = ssm_client.describe_instance_information()
    print("complete ec2 instance response", response)
    instances = response.get('InstanceInformationList', [])
    platformtype = [item.get('PlatformType') for item in instances]
    print("platformtype", platformtype)
    instance_ids = [item.get('InstanceId') for item in instances]
    print("Instance id's list", instance_ids)
    print("list of instances", instances)
    for item in instances:
        if 'PlatformType' in item:
            print("Platform Type:", item['PlatformType'])
            print("Instance id's list", instance_ids)
            result_item = {'InstanceId': item['InstanceId'], 'PlatformType': item['PlatformType']}
            result_list.append(result_item)
            print("ec2 instance id and platform type", result_list)
        else:
            print("No Platform Type found for this item")
    print("ec2 instance id and platform type", result_list)

    windows_instances = [instance for instance in result_list if instance['PlatformType'] == 'Windows']

    print("windows instances", windows_instances)
    windows_instance_dict = {}
    if not windows_instances:
        print("No Windows instances found.")
    else:
        print("Windows Instances:")
        print(windows_instances)
        windows_instance_ids = [instance['InstanceId'] for instance in windows_instances]
        print("windows instance id's", windows_instance_ids)

        for instance_id in windows_instance_ids:
            print("windows instances in loop", instance_id)
            document_name = 'Abrigo-EC2LaunchV2Info'
            response = run_ssm_command(instance_id, document_name)
            command_list.append(response)
            time.sleep(10)
            final_res = get_command_invocation(instance_id, response)
            # final_res_list.append(final_res)
            windows_instance_dict[instance_id] = final_res
            print("final_res EC2LaunchV2Info installation", final_res)
        print("&&&&&&&", command_list)
        print("windows instance id's and command id's", windows_instance_dict)

        # AWS SNS configurations
    sns_topic_arn = os.environ["ec2_launch_sns_topic"]
    email_subject = 'EC2LaunchV2Info installation details'
    email_body = f"Please find the below details for EC2LaunchV2Info installation on EC2 instances.\n\n"
    if len(windows_instance_dict) > 0:
        email_body += f"Windows Instances List :\n{json.dumps(windows_instance_dict, indent=2)}\n\n"

    if len(windows_instance_dict) == 0:
        email_body += f"No Instances found with SSM enabled"
    print("email_body type", type(email_body))

    # Create an SNS client
    sns_client = boto3.client('sns', region_name='us-east-1')

    # Publish the email message to the SNS topic
    response = sns_client.publish(
        TopicArn=sns_topic_arn,
        Subject=email_subject,
        Message=email_body
    )

    print("Message published successfully. MessageId:", response['MessageId'])
    return windows_instance_dict, response['MessageId']


def run_ssm_command(instance_id, document_name, parameters=None):
    print("entered into function run ssm command")
    """
    Run an SSM command on a specific EC2 instance.

    :param instance_id: The ID of the target EC2 instance.
    :param document_name: The name of the SSM document to execute.
    :param parameters: Optional parameters for the SSM command.
    :return: The command execution ID.
    """
    ssm_client = boto3.client('ssm')
    print("entered into boto3 ssm command")
    # Build the command parameters
    command_params = {
        'DocumentName': document_name,
        'Targets': [
            {
                'Key': 'instanceids',
                'Values': [instance_id]
            }
        ]
    }
    print("entered into command_params ssm command")
    if parameters:
        print("inside parameters")
        command_params['Parameters'] = parameters

    # Execute the SSM command
    print("entering into ssm send command")
    response = ssm_client.send_command(**command_params)
    print("printing ssm-run-response", response)

    # Get the command execution ID
    command_id = response['Command']['CommandId']

    # instance_id = instance_id  # Replace with your instance ID
    document_name = 'ssmdocument'  # Replace with your SSM document name
    # parameters = {'param_name': ['param_value']}  # Replace with your SSM command parameters if any

    # command_id = run_ssm_command(instance_id, document_name)

    print(f"SSM Command executed successfully. Command ID: {command_id}")
    return command_id


def get_command_invocation(instance_id, command_id):
    print("entered into get command invocation function")
    print("command id", command_id)
    ssm_client = boto3.client('ssm')

    try:
        response = ssm_client.get_command_invocation(
            InstanceId=instance_id,
            CommandId=command_id
        )

        print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!response!!!!!!!!!!!!!!!!!!!!!!!!", response)
        final_response = response
        ec2_instance_id = instance_id
        standard_output_content = final_response.get('StandardOutputContent', '')

        # Check conditions based on StandardOutputContent
        if latest_version in standard_output_content:
            compliance_type = 'COMPLIANT'
            print(f"EC2Launch V2 is installed and with version {latest_version} and compliance status: {compliance_type}")
            return f"EC2LaunchV2 is installed with version {latest_version} and  compliance status: {compliance_type}"
        else:
            compliance_type = 'NON_COMPLIANT'
            print(f"$$$EC2Launch V2 is not installed.  compliance status is: {compliance_type} ; current verison : {standard_output_content} Expected version: {latest_version} $$$")
            return f"EC2Launch V2 is not installed. compliance status is: {compliance_type} {standard_output_content} Expected version: {latest_version} "

    except Exception as e:
        print(f"Error retrieving command invocation details: {e}")
        return None

if __name__ == "__main__":
    get_ec2_instances(" ", " ")