import boto3 import json import time import os latest_version = os.environ["ec2_launch_v2"] ec2_launch_sns_topic = os.environ["ec2_launch_sns_topic"] def get_ec2_instances(instance_id, command_id): command_list = [] final_res_list = [] result_list = [] print("entered into get ec2 instances") ssm_client = boto3.client('ssm') response = ssm_client.describe_instance_information() print("complete ec2 instance response", response) instances = response.get('InstanceInformationList', []) platformtype = [item.get('PlatformType') for item in instances] print("platformtype", platformtype) instance_ids = [item.get('InstanceId') for item in instances] print("Instance id's list", instance_ids) print("list of instances", instances) for item in instances: if 'PlatformType' in item: print("Platform Type:", item['PlatformType']) print("Instance id's list", instance_ids) result_item = {'InstanceId': item['InstanceId'], 'PlatformType': item['PlatformType']} result_list.append(result_item) print("ec2 instance id and platform type", result_list) else: print("No Platform Type found for this item") print("ec2 instance id and platform type", result_list) windows_instances = [instance for instance in result_list if instance['PlatformType'] == 'Windows'] print("windows instances", windows_instances) windows_instance_dict = {} if not windows_instances: print("No Windows instances found.") else: print("Windows Instances:") print(windows_instances) windows_instance_ids = [instance['InstanceId'] for instance in windows_instances] print("windows instance id's", windows_instance_ids) for instance_id in windows_instance_ids: print("windows instances in loop", instance_id) document_name = 'Abrigo-EC2LaunchV2Info' response = run_ssm_command(instance_id, document_name) command_list.append(response) final_res = get_command_invocation(instance_id, response) # final_res_list.append(final_res) windows_instance_dict[instance_id] = final_res print("final_res EC2LaunchV2Info installation", final_res) print("&&&&&&&", command_list) print("windows instance id's and command id's", windows_instance_dict) # AWS SNS configurations sns_topic_arn = os.environ["ec2_launch_sns_topic"] email_subject = 'EC2LaunchV2Info installation details' email_body = f"Please find the below details for EC2LaunchV2Info installation on EC2 instances.\n\n" if len(windows_instance_dict) > 0: email_body += f"Windows Instances List :\n{json.dumps(windows_instance_dict, indent=2)}\n\n" if len(windows_instance_dict) == 0: email_body += f"No Instances found with SSM enabled" print("email_body type", type(email_body)) # Create an SNS client sns_client = boto3.client('sns', region_name='us-east-1') # Publish the email message to the SNS topic response = sns_client.publish( TopicArn=sns_topic_arn, Subject=email_subject, Message=email_body ) print("Message published successfully. MessageId:", response['MessageId']) return windows_instance_dict, response['MessageId'] def run_ssm_command(instance_id, document_name, parameters=None): print("entered into function run ssm command") """ Run an SSM command on a specific EC2 instance. :param instance_id: The ID of the target EC2 instance. :param document_name: The name of the SSM document to execute. :param parameters: Optional parameters for the SSM command. :return: The command execution ID. """ ssm_client = boto3.client('ssm') print("entered into boto3 ssm command") # Build the command parameters command_params = { 'DocumentName': document_name, 'Targets': [ { 'Key': 'instanceids', 'Values': [instance_id] } ] } print("entered into command_params ssm command") if parameters: print("inside parameters") command_params['Parameters'] = parameters # Execute the SSM command print("entering into ssm send command") response = ssm_client.send_command(**command_params) print("printing ssm-run-response", response) # Get the command execution ID command_id = response['Command']['CommandId'] # instance_id = instance_id # Replace with your instance ID document_name = 'ssmdocument' # Replace with your SSM document name # parameters = {'param_name': ['param_value']} # Replace with your SSM command parameters if any # command_id = run_ssm_command(instance_id, document_name) print(f"SSM Command executed successfully. Command ID: {command_id}") return command_id def get_command_invocation(instance_id, command_id): print("entered into get command invocation function") print("command id", command_id) ssm_client = boto3.client('ssm') try: response = ssm_client.get_command_invocation( InstanceId=instance_id, CommandId=command_id ) print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!response!!!!!!!!!!!!!!!!!!!!!!!!", response) final_response = response ec2_instance_id = instance_id standard_output_content = final_response.get('StandardOutputContent', '') # Check conditions based on StandardOutputContent if latest_version in standard_output_content: compliance_type = 'COMPLIANT' print(f"EC2Launch V2 is installed and with version {latest_version} and compliance status: {compliance_type}") return f"EC2LaunchV2 is installed with version {latest_version} and compliance status: {compliance_type}" else: compliance_type = 'NON_COMPLIANT' print(f"$$$EC2Launch V2 is not installed. compliance status is: {compliance_type} ; current verison : {standard_output_content} Expected version: {latest_version} $$$") return f"EC2Launch V2 is not installed. compliance status is: {compliance_type} {standard_output_content} Expected version: {latest_version} " except Exception as e: print(f"Error retrieving command invocation details: {e}") return None if __name__ == "__main__": get_ec2_instances(" ", " ")
Standard input is empty
import boto3
import json
import time
import os
latest_version = os.environ["ec2_launch_v2"]
ec2_launch_sns_topic = os.environ["ec2_launch_sns_topic"]
def get_ec2_instances(instance_id, command_id):
command_list = []
final_res_list = []
result_list = []
print("entered into get ec2 instances")
ssm_client = boto3.client('ssm')
response = ssm_client.describe_instance_information()
print("complete ec2 instance response", response)
instances = response.get('InstanceInformationList', [])
platformtype = [item.get('PlatformType') for item in instances]
print("platformtype", platformtype)
instance_ids = [item.get('InstanceId') for item in instances]
print("Instance id's list", instance_ids)
print("list of instances", instances)
for item in instances:
if 'PlatformType' in item:
print("Platform Type:", item['PlatformType'])
print("Instance id's list", instance_ids)
result_item = {'InstanceId': item['InstanceId'], 'PlatformType': item['PlatformType']}
result_list.append(result_item)
print("ec2 instance id and platform type", result_list)
else:
print("No Platform Type found for this item")
print("ec2 instance id and platform type", result_list)
windows_instances = [instance for instance in result_list if instance['PlatformType'] == 'Windows']
print("windows instances", windows_instances)
windows_instance_dict = {}
if not windows_instances:
print("No Windows instances found.")
else:
print("Windows Instances:")
print(windows_instances)
windows_instance_ids = [instance['InstanceId'] for instance in windows_instances]
print("windows instance id's", windows_instance_ids)
for instance_id in windows_instance_ids:
print("windows instances in loop", instance_id)
document_name = 'Abrigo-EC2LaunchV2Info'
response = run_ssm_command(instance_id, document_name)
command_list.append(response)
time.sleep(10)
final_res = get_command_invocation(instance_id, response)
# final_res_list.append(final_res)
windows_instance_dict[instance_id] = final_res
print("final_res EC2LaunchV2Info installation", final_res)
print("&&&&&&&", command_list)
print("windows instance id's and command id's", windows_instance_dict)
# AWS SNS configurations
sns_topic_arn = os.environ["ec2_launch_sns_topic"]
email_subject = 'EC2LaunchV2Info installation details'
email_body = f"Please find the below details for EC2LaunchV2Info installation on EC2 instances.\n\n"
if len(windows_instance_dict) > 0:
email_body += f"Windows Instances List :\n{json.dumps(windows_instance_dict, indent=2)}\n\n"
if len(windows_instance_dict) == 0:
email_body += f"No Instances found with SSM enabled"
print("email_body type", type(email_body))
# Create an SNS client
sns_client = boto3.client('sns', region_name='us-east-1')
# Publish the email message to the SNS topic
response = sns_client.publish(
TopicArn=sns_topic_arn,
Subject=email_subject,
Message=email_body
)
print("Message published successfully. MessageId:", response['MessageId'])
return windows_instance_dict, response['MessageId']
def run_ssm_command(instance_id, document_name, parameters=None):
print("entered into function run ssm command")
"""
Run an SSM command on a specific EC2 instance.
:param instance_id: The ID of the target EC2 instance.
:param document_name: The name of the SSM document to execute.
:param parameters: Optional parameters for the SSM command.
:return: The command execution ID.
"""
ssm_client = boto3.client('ssm')
print("entered into boto3 ssm command")
# Build the command parameters
command_params = {
'DocumentName': document_name,
'Targets': [
{
'Key': 'instanceids',
'Values': [instance_id]
}
]
}
print("entered into command_params ssm command")
if parameters:
print("inside parameters")
command_params['Parameters'] = parameters
# Execute the SSM command
print("entering into ssm send command")
response = ssm_client.send_command(**command_params)
print("printing ssm-run-response", response)
# Get the command execution ID
command_id = response['Command']['CommandId']
# instance_id = instance_id # Replace with your instance ID
document_name = 'ssmdocument' # Replace with your SSM document name
# parameters = {'param_name': ['param_value']} # Replace with your SSM command parameters if any
# command_id = run_ssm_command(instance_id, document_name)
print(f"SSM Command executed successfully. Command ID: {command_id}")
return command_id
def get_command_invocation(instance_id, command_id):
print("entered into get command invocation function")
print("command id", command_id)
ssm_client = boto3.client('ssm')
try:
response = ssm_client.get_command_invocation(
InstanceId=instance_id,
CommandId=command_id
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!response!!!!!!!!!!!!!!!!!!!!!!!!", response)
final_response = response
ec2_instance_id = instance_id
standard_output_content = final_response.get('StandardOutputContent', '')
# Check conditions based on StandardOutputContent
if latest_version in standard_output_content:
compliance_type = 'COMPLIANT'
print(f"EC2Launch V2 is installed and with version {latest_version} and compliance status: {compliance_type}")
return f"EC2LaunchV2 is installed with version {latest_version} and compliance status: {compliance_type}"
else:
compliance_type = 'NON_COMPLIANT'
print(f"$$$EC2Launch V2 is not installed. compliance status is: {compliance_type} ; current verison : {standard_output_content} Expected version: {latest_version} $$$")
return f"EC2Launch V2 is not installed. compliance status is: {compliance_type} {standard_output_content} Expected version: {latest_version} "
except Exception as e:
print(f"Error retrieving command invocation details: {e}")
return None
if __name__ == "__main__":
get_ec2_instances(" ", " ")