How to use describe_security_group_rules method in localstack

Best Python code snippet using localstack_python

AR_1001.py

Source:AR_1001.py Github

copy

Full Screen

...48 vpc_list = []49 if rds_instance.get('VpcSecurityGroups'):50 for sec_group in rds_instance['VpcSecurityGroups']:51 if sec_group['Status'] == 'active':52 sec_group_rule = ec2.describe_security_group_rules(Filters=[53 {54 'Name': 'group-id',55 'Values': [56 sec_group['VpcSecurityGroupId']57 ]58 },59 ], MaxResults=512)60 if rds_sec_group_allowed(sec_group_rule, rds_instance['DbInstancePort']):61 vpc_list.append(sec_group['VpcSecurityGroupId'])62 return vpc_list63def db_sec_group_list(rds_instance):64 db_list = []65 if rds_instance.get('DBSecurityGroups'):66 for sec_group in rds_instance['DBSecurityGroups']:67 security_groups = rds.describe_db_security_groups(68 DBSecurityGroupName=sec_group['DBSecurityGroupName'])69 if security_groups.get('DBSecurityGroups'):70 for security_group in security_groups['DBSecurityGroups']:71 if security_group.get('EC2SecurityGroups'):72 if security_group['EC2SecurityGroups']['Status'] == 'active':73 sec_group_rule = ec2.describe_security_group_rules(Filters=[74 {75 'Name': 'group-id',76 'Values': [77 security_group['EC2SecurityGroups']['EC2SecurityGroupId']78 ]79 },80 ], MaxResults=512)81 if rds_sec_group_allowed(sec_group_rule, rds_instance['DbInstancePort']):82 db_list.append(security_group['EC2SecurityGroups']['EC2SecurityGroupId'])83 return db_list84def is_cid_rip(rds_instance):85 if rds_instance.get('DBSecurityGroups'):86 for sec_group in rds_instance['DBSecurityGroups']:87 security_groups = rds.describe_db_security_groups(88 DBSecurityGroupName=sec_group['DBSecurityGroupName'])89 if security_groups.get('DBSecurityGroups'):90 for security_group in security_groups['DBSecurityGroups']:91 if security_group.get('IPRanges'):92 for ip_ranges in security_group['IPRanges']:93 if ip_ranges.get('CIDRIP'):94 if IPV4_HOLDER in ip_ranges['CIDRIP'] or IPV6_HOLDER in ip_ranges['CIDRIP']:95 return False96 return True97def is_true(rds_instance):98 if not rds_instance['PubliclyAccessible']:99 return False100 is_vpc_sec_group = True101 is_ec2_classic_sec_group = True102 vpc_list = vpc_security_group_list(rds_instance)103 if vpc_list:104 is_vpc_sec_group = len(vpc_list) == 0105 db_list = db_sec_group_list(rds_instance)106 if db_list:107 is_ec2_classic_sec_group = len(db_list) == 0108 is_cid_rip_range = is_cid_rip(rds_instance)109 return not is_vpc_sec_group or not is_ec2_classic_sec_group or not is_cid_rip_range110def is_network_interface_public(network_interfaces):111 is_public = False112 if network_interfaces:113 for network_interface in network_interfaces:114 if network_interface.get('Association'):115 if network_interface['Association']['PublicIp'] != "":116 is_public = True117 break118 return is_public119def rectify_open_sec_group_assigned(sec_group_rules, group_id):120 if sec_group_rules.get('SecurityGroupRules'):121 for sec_group_rule in sec_group_rules['SecurityGroupRules']:122 if sec_group_rule.get('IpProtocol') and sec_group_rule['IpProtocol'].upper() != 'ICMP':123 if (sec_group_rule.get('CidrIpv4')124 and sec_group_rule['CidrIpv4'] == IPV4_HOLDER) or (125 sec_group_rule.get('CidrIpv6')126 and sec_group_rule['CidrIpv6'] == IPV6_HOLDER):127 if sec_group_rule['FromPort'] == -1 or (128 sec_group_rule['FromPort'] == 0 and sec_group_rule['ToPort'] == 65535):129 modify_or_delete(group_id, sec_group_rule['SecurityGroupRuleId'])130def modify_or_delete(group_id, sec_group_rule_id):131 if want_to_delete_the_whole_security_group:132 ec2.delete_security_group(GroupId=group_id)133 else:134 modify_security_group_rules(group_id, sec_group_rule_id)135def modify_security_group_rules(group_id, sec_group_rule_id):136 response = None137 try:138 response = ec2.revoke_security_group_ingress(139 GroupId=group_id,140 SecurityGroupRuleIds=[sec_group_rule_id]141 )142 except Exception as e:143 print(e)144 if not response:145 try:146 response = ec2.revoke_security_group_egress(147 GroupId=group_id,148 SecurityGroupRuleIds=[sec_group_rule_id]149 )150 except Exception as e:151 print(e)152 print("Response from revoke security group: " + str(response))153def check_and_rectify_db_sec_group(db_security_groups):154 if db_security_groups:155 for db_security_group in db_security_groups:156 security_groups = rds.describe_db_security_groups(157 DBSecurityGroupName=db_security_group['DBSecurityGroupName'])158 if security_groups.get('DBSecurityGroups'):159 for security_group in security_groups['DBSecurityGroups']:160 if security_group.get('EC2SecurityGroups'):161 check_and_rectify_rds_security_group(security_group.get('EC2SecurityGroups'),162 'EC2SecurityGroupId')163def check_and_rectify_rds_security_group(rds_security_groups, group_id_var):164 if rds_security_groups:165 for rds_security_group in rds_security_groups:166 sec_group_rules = ec2.describe_security_group_rules(Filters=[167 {168 'Name': 'group-id',169 'Values': [170 rds_security_group[group_id_var]171 ]172 },173 ], MaxResults=512)174 rectify_open_sec_group_assigned(sec_group_rules, rds_security_group[group_id_var])175def check_and_rectify_elb_security_group(security_groups):176 if security_groups:177 for security_group in security_groups:178 sec_group_rules = ec2.describe_security_group_rules(Filters=[179 {180 'Name': 'group-id',181 'Values': [182 security_group183 ]184 },185 ], MaxResults=512)186 rectify_open_sec_group_assigned(sec_group_rules, security_group)187def check_and_rectify_es_security_group(security_group_ids):188 if security_group_ids:189 for security_group_id in security_group_ids:190 sec_group_rules = ec2.describe_security_group_rules(Filters=[191 {192 'Name': 'group-id',193 'Values': [194 security_group_id195 ]196 },197 ], MaxResults=512)198 rectify_open_sec_group_assigned(sec_group_rules, security_group_id)199def lambda_handler(event, context, ec2_regions):200 """ This single rule can be caused at 4 different resources, namely201 EC2, RDS, ELB and ES...202 Args:203 event (dict):204 This is the payload with is sent via Optix, whenever an alert is generated.205 context (dict):206 This is the AWS lambda context.207 ec2_regions (list):208 List of all available regions209 Returns:210 str: Always returns "Remediation successful". Everything else is logged.211 """212 payload_data = event['payloadData']213 rule_id = payload_data['ruleNumber']214 if event['eventType'] == 'ALERT' and rule_id == rule:215 affected_resources = payload_data['affectedResources']216 for reg in ec2_regions:217 global ec2218 ec2 = boto3.client('ec2', region_name=reg)219 global rds220 rds = boto3.client('rds', region_name=reg)221 global elb_v2222 elb_v2 = boto3.client('elbv2', region_name=reg)223 global es224 es = boto3.client('es', region_name=reg)225 for affected_resource in affected_resources:226 if affected_resource['state'] == "OPEN":227 try:228 # EC2 instance alert check229 ec2_instances = ec2.describe_instances()230 if ec2_instances and ec2_instances.get('Reservations'):231 for reservation in ec2_instances['Reservations']:232 if reservation.get('Instances'):233 for instance in reservation['Instances']:234 if instance['InstanceId'] in affected_resource['resourceInfo']:235 is_public = is_network_interface_public(instance.get('NetworkInterfaces'))236 if instance.get('SecurityGroups'):237 for security_group in instance['SecurityGroups']:238 sec_group_rules = ec2.describe_security_group_rules(Filters=[239 {240 'Name': 'group-id',241 'Values': [242 security_group['GroupId']243 ]244 },245 ], MaxResults=512)246 if is_public:247 rectify_open_sec_group_assigned(sec_group_rules,248 security_group['GroupId'])249 # rds instance alert check250 db_instances = rds.describe_db_instances()251 if db_instances and db_instances.get('DBInstances'):252 for db_instance in db_instances['DBInstances']:...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

...24 first = 'd14aec63'25 next_token = first26 while next_token:27 if next_token == first:28 response = ec2_client.describe_security_group_rules(29 Filters=[30 {31 'Name': 'group-id',32 'Values': [group]33 }34 ]35 )36 else:37 response = ec2_client.describe_security_group_rules(38 NextToken=next_token,39 Filters=[40 {41 'Name': 'group-id',42 'Values': [group]43 }44 ]45 )46 next_token = response.get('NextToken')47 for rule in response.get('SecurityGroupRules', []):48 rule_id = rule.get('SecurityGroupRuleId')49 if rule.get('IsEgress'):50 egress_rules.append(rule_id)51 else:...

Full Screen

Full Screen

get_sg_rule.py

Source:get_sg_rule.py Github

copy

Full Screen

...37 for sg in instance.get('SecurityGroups', []):38 self.sg_with_ec2[sg['GroupId']].append({"ec2": instance['InstanceId'],39 "interface_id": [x['NetworkInterfaceId'] for x in instance['NetworkInterfaces']]})40 self.all_ec2[instance['InstanceId']] = instance41 def describe_security_group_rules(self, group_ids):42 self.sgr_list = self.ec2_client.describe_security_group_rules(43 Filters=[44 {45 'Name': 'group-id',46 'Values': group_ids47 },48 ],49 )['SecurityGroupRules']50if __name__ == '__main__':51 # describe security group rules52 ec2sg = EC2SecurityGroup()53 ec2sg.get_security_resources()54 group_id_list = [x for x in ec2sg.sg_with_ec2 if ec2sg.sg_with_ec2[x]] ## 사용되는 SG55 ec2sg.describe_security_group_rules(group_id_list)56 #for x in ec2sg.sgr_list:57 # print("######### sgr: " + json.dumps(x, indent=4))58 for x in ec2sg.security_groups:59 print("######### sg: " + json.dumps(x, indent=4))60 print("######### sg with ec2: " + json.dumps(ec2sg.sg_with_ec2, indent=4))61 for x in ec2sg.all_ec2:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful