Best Python code snippet using localstack_python
views.py
Source:views.py
...315 316 mk.delete_classroom_firewall_rules(classroom_)317 318 if internet_id == "OFF":319 #mk.delete_firewall_rule("POST-LLISTA-"+classroom_id)320 #mk.delete_firewall_rule("LLISTA-"+classroom_id)321 #If we filter classroom by mac because doesn't follow any IP range knowed 322 if mac_filter:323 mk.block_classroom_by_mac(classroom_)324 else: 325 mk.add_firewall_rule(firewall_rule.comment,str(firewall_rule.src_address),str(firewall_rule.src_netmask))326 else:327 #mk.delete_firewall_rule(firewall_rule)328 if mac_filter:329 mk.delete_classroom_firewall_rules(classroom_)330 else:331 mk.delete_firewall_rule(firewall_rule.comment)332 333 # Esborrem residus de regles parcials quan internet On or OFF334 mk.delete_firewall_rule("POST-LLISTA-"+classroom_id)335 mk.delete_firewall_rule("LLISTA-"+classroom_id)336 337 except Exception, e:338 response.write("ERROR: "+str(e))339 340 341 342 return response343 #return direct_to_template(request, 'aules/aules.html')344def add_list_network_device(request,classroom_id,list_id):345 response = HttpResponse() 346 url_list = UrlList.objects.get(id=list_id)347 url_list_items = url_list.url_list_items.all()348 classroom_=Classroom.objects.get(id=classroom_id)349 network_type=classroom_.network_device.network_type 350 firewall_rule=classroom_.firewall_rule351 352 if network_type=="Mikrotik":353 try:354 mk =MikrotikRouter()355 mk.set_networkdevice(classroom_.network_device)356 mk.add_firewall_list(url_list_items,url_list.name)357 358 ''' Add drop firewall rule'''359 mk.delete_firewall_rule("POST-LLISTA-"+classroom_id)360 #If we don't have other choice and we use the less preferable way to block a classroom. Blocking using every single MAC #from the PCs of the Classroom361 if classroom_.mac_filter:362 mk.block_classroom_by_mac(classroom_,"POST-LLISTA-"+classroom_id)363 else:364 mk.add_firewall_rule("POST-LLISTA-"+classroom_id,str(firewall_rule.src_address),str(firewall_rule.src_netmask))365 366 ''' Add accept firewall rule to destination list'''367 mk.delete_firewall_rule("LLISTA-"+classroom_id)368 mk.add_firewall_rule("LLISTA-"+classroom_id,"","",url_list.name,"accept")369 370 except Exception, e:371 response.write("ERROR: "+str(e))372 373 return response 374def remove_list_network_device(request,classroom_id):375 response = HttpResponse() 376 classroom_=Classroom.objects.get(id=classroom_id)377 network_type=classroom_.network_device.network_type 378 if network_type=="Mikrotik":379 try:380 mk =MikrotikRouter()381 mk.set_networkdevice(classroom_.network_device) 382 mk.delete_firewall_rule("POST-LLISTA-"+classroom_id)383 mk.delete_firewall_rule("LLISTA-"+classroom_id)384 385 except Exception, e:386 response.write("ERROR: "+str(e))387 388 return response389''' No estem fent cas a classroom_id HARDWIRED '''390def set_barra_lliure(request,classroom_id,barralliure_id):391 response = HttpResponse()392 classroom_=Classroom.objects.get(id=classroom_id)393 network_type=classroom_.network_device.network_type394 #response.write("set barra lliure") 395 if network_type=="Mikrotik":396 try:397 mk =MikrotikRouter()...
networkoperations.py
Source:networkoperations.py
1from util.core.app.audit_log_transaction import insert_audit_log2from util.core.app.constants import TASK_STATUS3from util.core.app.terraform_resource import TerraformClass4from google.cloud import compute_v15from resource_adapters.utils.gcp_clients import *6from util.core.app.logger import get_logger_func7LOG = get_logger_func(__file__)8def add_audit_log(session, task_id, source, event, trace, status):9 payload = {10 "task_id": task_id,11 "source": source,12 "event": event,13 "trace": trace,14 "status": status15 }16 LOG.debug("Insert audit logs:%s" % payload, {'task_id': task_id})17 insert_audit_log(payload, session_factory=session._session_factory)18def delete_public_ip(parameters, session, **kwargs):19 # Delete public ip20 task_id = parameters.get('task_id')21 LOG.debug("delete_public_ip:%s" % parameters,22 {'task_id': task_id})23 address_client = get_gcp_address_client(parameters)24 op_client = get_gcp_op_client(parameters)25 add_audit_log(session, task_id, "NetworkOperations", "delete_public_ip",26 "started", TASK_STATUS.COMPLETED)27 try:28 ip_config_info = parameters.get('ip_config_info')29 except Exception as ex:30 raise Exception("Ip configuration is missing:%s" % str(ex))31 ips_deleted = []32 ips_failed = []33 if ip_config_info:34 for ip_name in ip_config_info:35 try:36 LOG.debug("IP Name is:%s" % ip_name, {'task_id': task_id})37 add_audit_log(session, task_id, "NetworkOperations",38 "delete_public_ip:%s" % ip_name,39 "started", TASK_STATUS.COMPLETED)40 op = address_client.delete(project=parameters.get('gcp_project'),41 region=parameters.get('location'),42 address=ip_name)43 while op.status != compute_v1.Operation.Status.DONE:44 op = op_client.wait(45 operation=op.name, zone=parameters.get('zone'),46 project=parameters.get('gcp_project'))47 add_audit_log(session, task_id, "NetworkOperations",48 "delete_public_ip:%s" % ip_name,49 "Success", TASK_STATUS.COMPLETED)50 ips_deleted.append(ip_name)51 except Exception as ex:52 ips_failed.append(ip_name)53 add_audit_log(session, task_id, "NetworkOperations",54 "delete_public_ip:%s" % ip_name,55 "%s" % str(ex), TASK_STATUS.FAILED)56 add_audit_log(session, task_id, "NetworkOperations",57 "delete_public_ip Ips deleted:%s, Ips failed:%s" %58 (ips_deleted, ips_failed), "Success", TASK_STATUS.COMPLETED)59 if ips_failed:60 add_audit_log(session, task_id, "NetworkOperations", "delete_public_ip",61 "Failed", TASK_STATUS.FAILED)62 raise Exception('Following ops failed {}'.format(ips_failed))63 else:64 add_audit_log(session, task_id, "NetworkOperations", "delete_public_ip",65 "No IP information in the request", TASK_STATUS.COMPLETED)66def delete_vnet(parameters, session, **kwargs):67 # Delete vnet68 task_id = parameters.get('task_id')69 LOG.debug("delete_vnet:%s" % parameters, {'task_id': task_id})70 network_client = get_gcp_network_client(parameters)71 wait_client = get_gcp_wait_client_global(parameters)72 add_audit_log(session, task_id, "NetworkOperations", "delete_vnet",73 "started", TASK_STATUS.COMPLETED)74 try:75 vnet_config_info = parameters.get('vnet_config_info')76 except Exception as ex:77 raise Exception("Vnet configuration is missing:%s" % str(ex))78 ips_deleted = []79 ips_failed = []80 if vnet_config_info:81 for vnet_name in vnet_config_info:82 try:83 LOG.debug("IP Name is:%s" % vnet_name, {'task_id': task_id})84 add_audit_log(session, task_id, "NetworkOperations",85 "delete_vnet:%s" % vnet_name,86 "started", TASK_STATUS.COMPLETED)87 op = network_client.delete(project=parameters.get('gcp_project'),88 network=vnet_name)89 while op.status != compute_v1.Operation.Status.DONE:90 op = wait_client.wait(operation=op.name,91 project=parameters.get('gcp_project'))92 add_audit_log(session, task_id, "NetworkOperations",93 "delete_vnet:%s" % vnet_name,94 "Success", TASK_STATUS.COMPLETED)95 ips_deleted.append(vnet_name)96 except Exception as ex:97 ips_failed.append(vnet_name)98 add_audit_log(session, task_id, "NetworkOperations",99 "delete_vnet:%s" % vnet_name,100 "%s" % str(ex), TASK_STATUS.FAILED)101 add_audit_log(session, task_id, "NetworkOperations",102 "delete_vnet Vnet deleted:%s, Vnet failed:%s" %103 (ips_deleted, ips_failed), "Success", TASK_STATUS.COMPLETED)104 if ips_failed:105 add_audit_log(session, task_id, "NetworkOperations", "delete_vnet",106 "Failed", TASK_STATUS.FAILED)107 raise Exception('Following ops failed {}'.format(ips_failed))108 else:109 add_audit_log(session, task_id, "NetworkOperations", "delete_public_ip",110 "No Vnet information in the request", TASK_STATUS.COMPLETED)111def delete_subnet(parameters, session, **kwargs):112 # Delete SubNet113 task_id = parameters.get('task_id')114 LOG.debug("delete_subnet:%s" % parameters,115 {'task_id': task_id})116 subnet_client = get_gcp_subnet_client(parameters)117 wait_client_reg = get_gcp_wait_client_regional(parameters)118 add_audit_log(session, task_id, "NetworkOperations", "delete_subnet",119 "started", TASK_STATUS.COMPLETED)120 try:121 subnet_config_info = parameters.get('subnet_config_info')122 except Exception as ex:123 raise Exception("SubNet configuration is missing:%s" % str(ex))124 ips_deleted = []125 ips_failed = []126 if subnet_config_info:127 for subnet_name in subnet_config_info:128 try:129 LOG.debug("IP Name is:%s" % subnet_name, {'task_id': task_id})130 add_audit_log(session, task_id, "NetworkOperations",131 "delete_subnet:%s" % subnet_name,132 "started", TASK_STATUS.COMPLETED)133 op = subnet_client.delete(project=parameters.get('gcp_project'),134 region=parameters.get('location'),135 subnetwork=subnet_name)136 while op.status != compute_v1.Operation.Status.DONE:137 op = wait_client_reg.wait(operation=op.name,138 project=parameters.get('gcp_project'),139 region=parameters.get('location'))140 add_audit_log(session, task_id, "NetworkOperations",141 "delete_subnet:%s" % subnet_name,142 "Success", TASK_STATUS.COMPLETED)143 ips_deleted.append(subnet_name)144 except Exception as ex:145 ips_failed.append(subnet_name)146 add_audit_log(session, task_id, "NetworkOperations",147 "delete_subnet:%s" % subnet_name,148 "%s" % str(ex), TASK_STATUS.FAILED)149 add_audit_log(session, task_id, "NetworkOperations",150 "delete_subnet SubNet deleted:%s, SubNet failed:%s" %151 (ips_deleted, ips_failed),152 "Success", TASK_STATUS.COMPLETED)153 if ips_failed:154 add_audit_log(session, task_id, "NetworkOperations", "delete_subnet",155 "Failed", TASK_STATUS.FAILED)156 raise Exception('Following ops failed {}'.format(ips_failed))157 else:158 add_audit_log(session, task_id, "NetworkOperations", "delete_public_ip",159 "No Subnet information in the request",160 TASK_STATUS.COMPLETED)161def delete_firewall_rules(parameters, session, **kwargs):162 # Delete public ip163 task_id = parameters.get('task_id')164 LOG.debug("delete_firewall_rule:%s" % parameters,165 {'task_id': task_id})166 firewall_client = get_gcp_firewall_client(parameters)167 op_client = get_gcp_op_client(parameters)168 add_audit_log(session, task_id, "NetworkOperations", "delete_firewall_rule",169 "started", TASK_STATUS.COMPLETED)170 try:171 firewall_config_info = parameters.get('firewall_config_info')172 except Exception as ex:173 raise Exception("FW configuration is missing:%s" % str(ex))174 rules_deleted = []175 rules_failed = []176 if firewall_config_info:177 for fw_name in firewall_config_info:178 try:179 LOG.debug("IP Name is:%s" % fw_name, {'task_id': task_id})180 add_audit_log(session, task_id, "NetworkOperations",181 "delete_firewall_rule:%s" % fw_name,182 "started", TASK_STATUS.COMPLETED)183 op = firewall_client.delete(project=parameters.get('gcp_project'),184 firewall=fw_name)185 while op.status != compute_v1.Operation.Status.DONE:186 op = op_client.wait(187 operation=op.name, zone=parameters.get('zone'),188 project=parameters.get('gcp_project'))189 add_audit_log(session, task_id, "NetworkOperations",190 "delete_firewall_rule:%s" % fw_name,191 "Success", TASK_STATUS.COMPLETED)192 rules_deleted.append(fw_name)193 except Exception as ex:194 rules_failed.append(fw_name)195 add_audit_log(session, task_id, "NetworkOperations",196 "delete_firewall_rule:%s" % fw_name,197 "%s" % str(ex), TASK_STATUS.FAILED)198 add_audit_log(session, task_id, "NetworkOperations",199 "delete_firewall_rule Rules deleted:%s, Rules failed:%s" %200 (rules_deleted, rules_failed), "Success", TASK_STATUS.COMPLETED)201 if rules_failed:202 add_audit_log(session, task_id, "NetworkOperations", "delete_firewall_rule",203 "Failed", TASK_STATUS.FAILED)204 raise Exception('Following ops failed {}'.format(rules_failed))205 else:206 add_audit_log(session, task_id, "NetworkOperations", "delete_firewall_rule",...
signals.py
Source:signals.py
...12 # if the user's group has changed then the each client must have their rule13 # deleted and re-created in order to have the new group rules applied14 if instance.has_changed("group_id"):15 for client in models.Client.objects.by_user(instance).all():16 client.delete_firewall_rule()17 client.create_firewall_rule()18@receiver(signals.post_delete, sender=models.Device)19def device_post_delete(instance, **kwargs):20 """21 Handles post-delete actions for the given device.22 :return: None23 """24 if instance.serial:25 models.RevokedDevice.objects.create(serial=instance.serial)26@receiver(signals.post_save, sender=models.Group)27def group_post_save(instance, **kwargs):28 """29 Handles post-save actions for the given group.30 :return: None31 """32 if instance.is_enabled:33 instance.create_firewall_chain()34 else:35 instance.clients.delete()36 instance.delete_firewall_chain()37@receiver(signals.post_delete, sender=models.Group)38def group_post_delete(instance, **kwargs):39 """40 Handles post-delete actions for the given group.41 :return: None42 """43 instance.delete_firewall_chain()44@receiver(signals.post_delete, sender=models.Client)45def client_post_delete(instance, **kwargs):46 """47 Handles post-delete actions for the given client.48 :return: None49 """50 instance.delete_firewall_rule()51 tasks.disconnect_openvpn_client(instance.remote_ip)52@receiver(signals.post_save, sender=models.Client)53def client_post_save(instance, **kwargs):54 """55 Handles post-save actions for the given client.56 :return: None57 """58 instance.create_firewall_rule()59@receiver(signals.post_save, sender=models.FirewallRule)60def firewall_rule_post_save(instance, **kwargs):61 """62 Handles post-save action for the given firewall rule.63 :return: None64 """65 instance.group.create_firewall_chain()66@receiver(signals.post_delete, sender=models.FirewallRule)67def firewall_rule_post_delete(instance, **kwargs):68 """69 Handles post-delete actions for the given firewall rule.70 :return: None71 """...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!