Best Python code snippet using localstack_python
create_mirror_session.py
Source:create_mirror_session.py
...120 'private_dns': private_dns,121 'NetworkInterfaceId': NetworkInterfaceId122 }123 return(host_info)124def create_traffic_mirror_target(zeek_nic_id):125 myfigs = parse_configs()126 Owner_Name = myfigs['Owner_Name']127 Owner_Email = myfigs['Owner_Email']128 demo_name = myfigs['demo_name']129 ec2 = boto3.client('ec2')130 response = ec2.create_traffic_mirror_target(131 NetworkInterfaceId = zeek_nic_id,132 Description = 'bhayes traffic mirror target for winlogbeat demo',133 TagSpecifications=[134 {135 'ResourceType': 'traffic-mirror-target',136 'Tags': make_tags('winlogbeat traffic mirror')137 }138 ],139 #DryRun = True140 )141 #print("Create Traffic Mirror Target response is ")142 debugprint(response)143 return(response)144 145def create_traffic_mirror_filter():146 ec2 = boto3.client('ec2')147 response = ec2.create_traffic_mirror_filter(148 Description = 'bhayes-winlogbeat-zeek-mirror-filter',149 TagSpecifications=[150 {151 'ResourceType': 'traffic-mirror-filter',152 'Tags': make_tags('winlogbeat traffic mirror filter')153 }154 ] 155 )156 debugprint(response)157 TrafficMirrorFilter = response['TrafficMirrorFilter']158 filter_id = TrafficMirrorFilter['TrafficMirrorFilterId']159 print("filter ID is " + filter_id)160 return(filter_id)161 162 163def create_traffic_mirror_filter_rule(filter_id):164 ec2 = boto3.client('ec2')165 ingress_response = ec2.create_traffic_mirror_filter_rule(166 TrafficMirrorFilterId = filter_id,167 TrafficDirection = 'ingress',168 RuleNumber = 100,169 RuleAction = 'accept',170 SourceCidrBlock = '0.0.0.0/0',171 DestinationCidrBlock = '10.100.0.0/24',172 Description = 'Default Allow All'173 )174 print("create ingress rule")175 debugprint(json.dumps(ingress_response))176 ec2 = boto3.client('ec2')177 egress_response = ec2.create_traffic_mirror_filter_rule(178 TrafficMirrorFilterId = filter_id,179 TrafficDirection = 'egress',180 RuleNumber = 100,181 RuleAction = 'accept',182 SourceCidrBlock = '10.100.0.0/24',183 DestinationCidrBlock = '0.0.0.0/0',184 Description = 'Default Allow All' 185 )186 print("create egress rule")187 debugprint(json.dumps(egress_response))188 189 190def create_traffic_mirror_session(windows_nic_id,TargetId,filter_id):191 myfigs = parse_configs()192 Owner_Name = myfigs['Owner_Name']193 Owner_Email = myfigs['Owner_Email']194 demo_name = myfigs['demo_name'] 195 196 ec2 = boto3.client('ec2')197 response = ec2.create_traffic_mirror_session(198 NetworkInterfaceId = windows_nic_id,199 TrafficMirrorTargetId = TargetId,200 TrafficMirrorFilterId = filter_id,201 SessionNumber = 1,202 Description = 'bhayes winlogbeat demo mirror session',203 TagSpecifications=[204 {205 'ResourceType': 'traffic-mirror-session',206 'Tags': make_tags('winlogbeat traffic mirror session')207 }208 ] 209 )210 debugprint(response)211 return(response)212 213if args.debug:214 def debugprint(*args):215 for arg in args:216 print (arg),217 print218 print219else:220 debugprint = lambda *a: None221zeek_instance_info = get_running_instance_info('zeek')222debugprint(zeek_instance_info)223zeek_host_info = parse_instance_info(zeek_instance_info)224debugprint(json.dumps(zeek_host_info))225zeek_nic_id = zeek_host_info['NetworkInterfaceId']226windows_instance_info = get_running_instance_info('windows')227windows_host_info = parse_instance_info(windows_instance_info)228windows_nic_id = windows_host_info['NetworkInterfaceId']229debugprint(json.dumps(windows_host_info))230filter_id = create_traffic_mirror_filter()231create_traffic_mirror_filter_rule(filter_id)232mirror_target_response = create_traffic_mirror_target(zeek_nic_id)233TrafficMirrorTarget = mirror_target_response['TrafficMirrorTarget']234TargetId = TrafficMirrorTarget['TrafficMirrorTargetId']235print("Created Traffic Mirror Target ID " + TargetId)236mirror_session = create_traffic_mirror_session(windows_nic_id,TargetId,filter_id)237if mirror_session['TrafficMirrorSession']:238 TrafficMirrorSession = mirror_session['TrafficMirrorSession']239 TrafficMirrorSessionId = TrafficMirrorSession['TrafficMirrorSessionId']240 print("traffic mirror session " + TrafficMirrorSessionId + " created")241 242else:...
test_create_traffic_mirroring.py
Source:test_create_traffic_mirroring.py
1import json2from unittest import TestCase3from unittest.mock import Mock4from uuid import uuid45from jsonschema import validate6from cloudshell.cp.core.models import (7 CreateTrafficMirroring,8 CreateTrafficMirroringParams,9)10from cloudshell.cp.aws.domain.common.cancellation_service import (11 CommandCancellationService,12)13from cloudshell.cp.aws.domain.conncetivity.operations.traffic_mirroring_operation import ( # noqa14 CREATE_SCHEMA,15 REMOVE_SCHEMA,16 TrafficMirrorOperation,17)18from cloudshell.cp.aws.domain.services.cloudshell.traffic_mirror_pool_services import (19 SessionNumberService,20)21from cloudshell.cp.aws.domain.services.ec2.mirroring import TrafficMirrorService22from cloudshell.cp.aws.models.reservation_model import ReservationModel23class TestCreateTrafficMirroring(TestCase):24 def test_valid_create_returns_success_actions(self):25 session_number_service = SessionNumberService()26 traffic_mirror_service = TrafficMirrorService()27 cancellation_service = CommandCancellationService()28 reservation_context = Mock()29 reservation_context.reservation_id = str(uuid4())30 reservation = ReservationModel(reservation_context)31 reservation.blueprint = "lalala"32 reservation.owner = "admin"33 reservation.domain = "global"34 describe_mirror_targets_result = {35 "TrafficMirrorTargets": [36 {"NetworkInterfaceId": "bbbb", "TrafficMirrorTargetId": "cccc"}37 ]38 }39 create_traffic_mirror_target_result = {40 "TrafficMirrorTarget": {"TrafficMirrorTargetId": "tmt-5050"}41 }42 create_filter_result = {43 "TrafficMirrorFilter": {"TrafficMirrorFilterId": "tmf-5050"}44 }45 create_traffic_mirror_session_result = {46 "TrafficMirrorSession": {"TrafficMirrorSessionId": "tms-5050"}47 }48 ec2_client = Mock()49 ec2_client.describe_traffic_mirror_targets = Mock(50 return_value=describe_mirror_targets_result51 )52 ec2_client.create_traffic_mirror_target = Mock(53 return_value=create_traffic_mirror_target_result54 )55 ec2_client.create_traffic_mirror_filter = Mock(56 return_value=create_filter_result57 )58 ec2_client.create_traffic_mirror_session = Mock(59 return_value=create_traffic_mirror_session_result60 )61 cancellation_context = Mock()62 cancellation_context.is_cancelled = False63 logger = Mock()64 cloudshell = Mock()65 checkout_result = Mock()66 checkout_result.Items = [5]67 cloudshell.CheckoutFromPool = Mock(return_value=checkout_result)68 action = CreateTrafficMirroring()69 action.actionId = str(uuid4())70 action.actionParams = CreateTrafficMirroringParams()71 action.actionParams.sessionNumber = "5"72 action.actionParams.sourceNicId = "a"73 action.actionParams.targetNicId = "b"74 actions = [action]75 op = TrafficMirrorOperation(76 session_number_service,77 traffic_mirror_service,78 cancellation_service,79 )80 results = op.create(81 ec2_client=ec2_client,82 reservation=reservation,83 actions=actions,84 cancellation_context=cancellation_context,85 logger=logger,86 cloudshell=cloudshell,87 )88 self.assertTrue([x for x in results if x.success])89 def test_json_validate(self):90 request = """91 {92 "driverRequest": {93 "actions": [94 {95 "actionId": "a156d3db-78fe-4c19-9039-a225d0360119",96 "type": "RemoveTrafficMirroring",97 "sessionId": "tms-020e45731259d882d",98 "targetNicId": ""99 }100 ]101 }102 }103 """104 result = json.loads(request)105 for a in result["driverRequest"]["actions"]:106 validate(a, REMOVE_SCHEMA)107 create_request = """108 {109 "driverRequest": {110 "actions": [111 {112 "actionId": "a156d3db-78fe-4c19-9039-a225d0360119",113 "type": "CreateTrafficMirroring",114 "actionParams": {"type": "CreateTrafficMirroringParams",115 "sourceNicId": "eni-0bf9b403bd8d36a79",116 "targetNicId": "eni-060613fdccd935b67",117 "sessionNumber": "",118 "filterRules": [119 {120 "type": "TrafficFilterRule",121 "direction": "ingress",122 "sourcePortRange": {123 "type": "PortRange",124 "fromPort": "123",125 "toPort": "123"126 },127 "protocol": "udp"128 }129 ]130 }131 }132 ]133 }134 }135 """ # noqa136 create_actions2 = json.loads(create_request)137 for a in create_actions2["driverRequest"]["actions"]:...
nlb_target_factory.py
Source:nlb_target_factory.py
...37 if "elasticloadbalancing" in nlb_or_eni_arn:38 kwargs["NetworkLoadBalancerArn"] = nlb_or_eni_arn39 else:40 kwargs["NetworkInterfaceId"] = nlb_or_eni_arn41 response = self.ec2_client.create_traffic_mirror_target(**kwargs)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!