Best Python code snippet using localstack_python
upload_server_certificate.py
Source:upload_server_certificate.py
...80 if ExceptionHandler.client_exception(e):81 return e82 finally:83 counter += 184def upload_server_certificate(params):85 '''86 upload_server_certificateï¼ä¸ä¼ æå¡å¨è¯ä¹¦87 å®ç½APIåèé¾æ¥: https://help.aliyun.com/document_detail/34181.html88 '''89 counter = 090 while counter < TRY_TIME:91 try:92 request = UploadServerCertificateRequest.UploadServerCertificateRequest()93 # è¦ä¸ä¼ çå
¬é¥è¯ä¹¦94 request.set_ServerCertificate(params["server_certificate"])95 # éè¦ä¸ä¼ çç§é¥96 request.set_PrivateKey(params["private_key"])97 # åéè°ç¨è¯·æ±å¹¶æ¥æ¶è¿åæ°æ®98 response = ACS_CLIENT.do_action_with_exception(request)99 response_json = json.loads(response)100 # è¿åç»æ101 return response_json102 except ServerException as e:103 ExceptionHandler.server_exception(e)104 except ClientException as e:105 ExceptionHandler.client_exception(e)106 finally:107 counter += 1108def create_https_listener(params):109 '''110 create_https_listenerï¼å建HTTPSçå¬111 å®ç½APIåèé¾æ¥: https://help.aliyun.com/document_detail/27593.html112 '''113 counter = 0114 while counter < TRY_TIME:115 try:116 request = CreateLoadBalancerHTTPSListenerRequest.CreateLoadBalancerHTTPSListenerRequest()117 # è´è½½åè¡¡å®ä¾çID118 request.set_LoadBalancerId(params["load_balancer_id"])119 # çå¬ç带宽峰å¼120 request.set_Bandwidth(params["bandwidth"])121 # è´è½½åè¡¡å®ä¾å端使ç¨ç端å£122 request.set_ListenerPort(params["listener_port"])123 # æ¯å¦å¼å¯å¥åº·æ£æ¥124 request.set_HealthCheck(params["health_check"])125 # æ¯å¦å¼å¯ä¼è¯ä¿æ126 request.set_StickySession(params["sticky_session"])127 # è´è½½åè¡¡å®ä¾å端使ç¨ç端å£128 request.set_BackendServerPort(params["backend_server_port"])129 # æå¡å¨è¯ä¹¦çID130 request.set_ServerCertificateId(params["server_certificate_id"])131 # åéè°ç¨è¯·æ±å¹¶æ¥æ¶è¿åæ°æ®132 response = ACS_CLIENT.do_action_with_exception(request)133 response_json = json.loads(response)134 # è¿åç»æ135 return response_json136 except ServerException as e:137 ExceptionHandler.server_exception(e)138 except ClientException as e:139 ExceptionHandler.client_exception(e)140 finally:141 counter += 1142def set_https_listener_attribute(params):143 '''144 set_https_listener_attributeï¼ä¿®æ¹HTTPSçå¬çé
ç½®145 å®ç½APIåèé¾æ¥: https://help.aliyun.com/document_detail/27603.html146 '''147 counter = 0148 while counter < TRY_TIME:149 try:150 request = SetLoadBalancerHTTPSListenerAttributeRequest.SetLoadBalancerHTTPSListenerAttributeRequest()151 # è´è½½åè¡¡å®ä¾çID152 request.set_LoadBalancerId(params["load_balancer_id"])153 # çå¬ç带宽峰å¼154 request.set_Bandwidth(params["bandwidth"])155 # è´è½½åè¡¡å®ä¾å端使ç¨ç端å£156 request.set_ListenerPort(params["listener_port"])157 # æ¯å¦å¼å¯å¥åº·æ£æ¥158 request.set_HealthCheck(params["health_check"])159 # æ¯å¦å¼å¯ä¼è¯ä¿æ160 request.set_StickySession(params["sticky_session"])161 # æå¡å¨è¯ä¹¦çID162 request.set_ServerCertificateId(params["server_certificate_id"])163 # åéè°ç¨è¯·æ±å¹¶æ¥æ¶è¿åæ°æ®164 response = ACS_CLIENT.do_action_with_exception(request)165 response_json = json.loads(response)166 # è¿åç»æ167 return response_json168 except ServerException as e:169 ExceptionHandler.server_exception(e)170 except ClientException as e:171 ExceptionHandler.client_exception(e)172 finally:173 counter += 1174def delete_load_balancer(load_balancer_id):175 '''176 delete_load_balancerï¼å é¤slbå®ä¾177 å®ç½APIåèé¾æ¥ï¼https://help.aliyun.com/document_detail/27579.html178 '''179 try:180 # 设置å é¤SLBå®ä¾çè°ç¨åæ°181 request = DeleteLoadBalancerRequest.DeleteLoadBalancerRequest()182 # è´è½½åè¡¡å®ä¾çID183 request.set_LoadBalancerId(load_balancer_id)184 # åéè°ç¨è¯·æ±å¹¶æ¥æ¶è¿åæ°æ®185 response = ACS_CLIENT.do_action_with_exception(request)186 response_json = json.loads(response)187 # è¿åç»æ188 return response_json189 except ServerException as e:190 ExceptionHandler.server_exception(e)191 except ClientException as e:192 ExceptionHandler.client_exception(e)193def main():194 params = {}195 # 设置å建SLBå®ä¾çåæ°196 # 设置æ°å»ºSLBå®ä¾ç主å¯ç¨åºä¸ºcn-zhangjiakou-a197 params["master_zone_id"] = "cn-zhangjiakou-a"198 # 设置æ°å»ºSLBå®ä¾ç主å¯ç¨åºä¸ºcn-zhangjiakou-b199 params["slave_zone_id"] = "cn-zhangjiakou-b"200 # 设置æ°å»ºSLBå®ä¾çå称为SLB1201 params["load_balancer_name"] = "SLB1"202 # 设置æ°å»ºSLBå®ä¾ç计费类å为æé计费203 params["pay_balancer_type"] = "PayOnDemand"204 # 设置添å å°é»è®¤æå¡å¨ç»çECSçå®ä¾IDåæé205 params["backend_servers"] = [{"ServerId": "i-8vbe8yixxxxxxxxxxxxxw", "Weight": "100"},206 {"ServerId": "i-8vbe8yxxxxxxxxxxxxxj9v", "Weight": "100"}]207 # 设置ä¸ä¼ æå¡å¨è¯ä¹¦çåæ°208 # 设置å建HTTPSçå¬çåæ°209 # å
³éå¥åº·æ£æ¥210 params["health_check"] = "off"211 # 设置çå¬ç带宽峰å¼212 params["bandwidth"] = 6213 # è´è½½åè¡¡å®ä¾å端使ç¨ç端å£214 params["listener_port"] = 80215 # 设置è´è½½åè¡¡å®ä¾å端使ç¨ç端å£216 params["backend_server_port"] = 443217 # å
³éä¼è¯ä¿æ218 params["sticky_session"] = "off"219 # å建slbå®ä¾220 # è·åcreate_load_balancerå½æ°è¿åå¼ï¼load_balancer_json为ç»æçjson串221 load_balancer_json = create_load_balancer(params)222 # æå° load_balancer_jsonç»æï¼å
¶ä¸"create_load_balancer"æ¯å¯¹json串起çåå223 CommonUtil.log("create_load_balancer", load_balancer_json)224 # ä¸ä¼ æå¡å¨è¯ä¹¦225 # è¦ä¸ä¼ çå
¬é¥è¯ä¹¦226 params["server_certificate"] = "-----BEGIN CERTIFICATE-----xxxxxxx-----END CERTIFICATE-----"227 params["private_key"] = "-----BEGIN RSA PRIVATE KEY-----xxxxxxxxxxx-----END RSA PRIVATE KEY-----"228 result_json = upload_server_certificate(params)229 CommonUtil.log("upload_server_certificate", result_json)230 params["server_certificate_id"] = result_json["ServerCertificateId"]231 # slbå®ä¾id232 params["load_balancer_id"] = load_balancer_json["LoadBalancerId"]233 # å建httpsçå¬234 result_json = create_https_listener(params)235 CommonUtil.log("create_https_listener", result_json)236 # ä¸ä¼ æ°çæå¡å¨è¯ä¹¦237 # è¦ä¸ä¼ çæ°å
¬é¥è¯ä¹¦238 params["server_certificate"] = "-----BEGIN CERTIFICATE-----xxxxxxx-----END CERTIFICATE-----"239 params["private_key"] = "-----BEGIN RSA PRIVATE KEY-----xxxxxxxxxxx-----END RSA PRIVATE KEY-----"240 result_json = upload_server_certificate(params)241 CommonUtil.log("upload_server_certificate", result_json)242 params["server_certificate_id"] = result_json["ServerCertificateId"]243 # ä¿®æ¹httpsçå¬é
ç½®244 result_json = set_https_listener_attribute(params)245 CommonUtil.log("set_https_listener_attribute", result_json)246 # å é¤slbå®ä¾247 # å é¤è¿åçLoadBalancerId对åºçSLBå®ä¾248 load_balancer_json = delete_load_balancer(load_balancer_json["LoadBalancerId"])249 # æå° load_balancer_jsonç»æ250 CommonUtil.log("delete_load_balancer", load_balancer_json)251if __name__ == "__main__":...
pipelines.py
Source:pipelines.py
1import logging2from broker.tasks import alb, cloudfront, update_operations, iam, letsencrypt, route533from broker.tasks.huey import huey4logger = logging.getLogger(__name__)5def queue_all_alb_provision_tasks_for_operation(operation_id: int, correlation_id: str):6 if correlation_id is None:7 raise RuntimeError("correlation_id must be set")8 if operation_id is None:9 raise RuntimeError("operation_id must be set")10 correlation = {"correlation_id": correlation_id}11 task_pipeline = (12 letsencrypt.create_user.s(operation_id, **correlation)13 .then(letsencrypt.generate_private_key, operation_id, **correlation)14 .then(letsencrypt.initiate_challenges, operation_id, **correlation)15 .then(route53.create_TXT_records, operation_id, **correlation)16 .then(route53.wait_for_changes, operation_id, **correlation)17 .then(letsencrypt.answer_challenges, operation_id, **correlation)18 .then(letsencrypt.retrieve_certificate, operation_id, **correlation)19 .then(iam.upload_server_certificate, operation_id, **correlation)20 .then(alb.select_alb, operation_id, **correlation)21 .then(alb.add_certificate_to_alb, operation_id, **correlation)22 .then(route53.create_ALIAS_records, operation_id, **correlation)23 .then(route53.wait_for_changes, operation_id, **correlation)24 .then(update_operations.provision, operation_id, **correlation)25 )26 huey.enqueue(task_pipeline)27def queue_all_alb_deprovision_tasks_for_operation(28 operation_id: int, correlation_id: str29):30 if correlation_id is None:31 raise RuntimeError("correlation_id must be set")32 if operation_id is None:33 raise RuntimeError("operation_id must be set")34 correlation = {"correlation_id": correlation_id}35 task_pipeline = (36 update_operations.cancel_pending_provisioning.s(operation_id, **correlation)37 .then(route53.remove_ALIAS_records, operation_id, **correlation)38 .then(route53.remove_TXT_records, operation_id, **correlation)39 .then(alb.remove_certificate_from_alb, operation_id, **correlation)40 .then(iam.delete_server_certificate, operation_id, **correlation)41 .then(update_operations.deprovision, operation_id, **correlation)42 )43 huey.enqueue(task_pipeline)44def queue_all_migration_deprovision_tasks_for_operation(45 operation_id: int, correlation_id: str46):47 if correlation_id is None:48 raise RuntimeError("correlation_id must be set")49 if operation_id is None:50 raise RuntimeError("operation_id must be set")51 correlation = {"correlation_id": correlation_id}52 task_pipeline = update_operations.deprovision.s(operation_id, **correlation)53 huey.enqueue(task_pipeline)54def queue_all_cdn_provision_tasks_for_operation(operation_id: int, correlation_id: str):55 if correlation_id is None:56 raise RuntimeError("correlation_id must be set")57 if operation_id is None:58 raise RuntimeError("operation_id must be set")59 correlation = {"correlation_id": correlation_id}60 task_pipeline = (61 letsencrypt.create_user.s(operation_id, **correlation)62 .then(letsencrypt.generate_private_key, operation_id, **correlation)63 .then(letsencrypt.initiate_challenges, operation_id, **correlation)64 .then(route53.create_TXT_records, operation_id, **correlation)65 .then(route53.wait_for_changes, operation_id, **correlation)66 .then(letsencrypt.answer_challenges, operation_id, **correlation)67 .then(letsencrypt.retrieve_certificate, operation_id, **correlation)68 .then(iam.upload_server_certificate, operation_id, **correlation)69 .then(cloudfront.create_distribution, operation_id, **correlation)70 .then(cloudfront.wait_for_distribution, operation_id, **correlation)71 .then(route53.create_ALIAS_records, operation_id, **correlation)72 .then(route53.wait_for_changes, operation_id, **correlation)73 .then(update_operations.provision, operation_id, **correlation)74 )75 huey.enqueue(task_pipeline)76def queue_all_cdn_deprovision_tasks_for_operation(77 operation_id: int, correlation_id: str78):79 if correlation_id is None:80 raise RuntimeError("correlation_id must be set")81 if operation_id is None:82 raise RuntimeError("operation_id must be set")83 correlation = {"correlation_id": correlation_id}84 task_pipeline = (85 update_operations.cancel_pending_provisioning.s(operation_id, **correlation)86 .then(route53.remove_ALIAS_records, operation_id, **correlation)87 .then(route53.remove_TXT_records, operation_id, **correlation)88 .then(cloudfront.disable_distribution, operation_id, **correlation)89 .then(cloudfront.wait_for_distribution_disabled, operation_id, **correlation)90 .then(cloudfront.delete_distribution, operation_id=operation_id, **correlation)91 .then(iam.delete_server_certificate, operation_id, **correlation)92 .then(update_operations.deprovision, operation_id, **correlation)93 )94 huey.enqueue(task_pipeline)95def queue_all_alb_renewal_tasks_for_operation(operation_id, **kwargs):96 correlation = {"correlation_id": "Renewal"}97 task_pipeline = (98 letsencrypt.generate_private_key.s(operation_id, **correlation)99 .then(letsencrypt.initiate_challenges, operation_id, **correlation)100 .then(route53.create_TXT_records, operation_id, **correlation)101 .then(route53.wait_for_changes, operation_id, **correlation)102 .then(letsencrypt.answer_challenges, operation_id, **correlation)103 .then(letsencrypt.retrieve_certificate, operation_id, **correlation)104 .then(iam.upload_server_certificate, operation_id, **correlation)105 .then(alb.select_alb, operation_id, **correlation)106 .then(alb.add_certificate_to_alb, operation_id, **correlation)107 .then(route53.create_ALIAS_records, operation_id, **correlation)108 .then(route53.wait_for_changes, operation_id, **correlation)109 .then(alb.remove_certificate_from_previous_alb, operation_id, **correlation)110 .then(iam.delete_previous_server_certificate, operation_id, **correlation)111 .then(update_operations.provision, operation_id, **correlation)112 )113 huey.enqueue(task_pipeline)114def queue_all_cdn_renewal_tasks_for_operation(operation_id, **kwargs):115 correlation = {"correlation_id": "Renewal"}116 task_pipeline = (117 letsencrypt.generate_private_key.s(operation_id, **correlation)118 .then(letsencrypt.initiate_challenges, operation_id, **correlation)119 .then(route53.create_TXT_records, operation_id, **correlation)120 .then(route53.wait_for_changes, operation_id, **correlation)121 .then(letsencrypt.answer_challenges, operation_id, **correlation)122 .then(letsencrypt.retrieve_certificate, operation_id, **correlation)123 .then(iam.upload_server_certificate, operation_id, **correlation)124 .then(cloudfront.update_certificate, operation_id, **correlation)125 .then(iam.delete_previous_server_certificate, operation_id, **correlation)126 .then(update_operations.provision, operation_id, **correlation)127 )128 huey.enqueue(task_pipeline)129def queue_all_cdn_update_tasks_for_operation(operation_id, correlation_id):130 correlation = {"correlation_id": correlation_id}131 task_pipeline = (132 letsencrypt.generate_private_key.s(operation_id, **correlation)133 .then(letsencrypt.initiate_challenges, operation_id, **correlation)134 .then(route53.create_TXT_records, operation_id, **correlation)135 .then(route53.wait_for_changes, operation_id, **correlation)136 .then(letsencrypt.answer_challenges, operation_id, **correlation)137 .then(letsencrypt.retrieve_certificate, operation_id, **correlation)138 .then(iam.upload_server_certificate, operation_id, **correlation)139 .then(cloudfront.update_distribution, operation_id, **correlation)140 .then(cloudfront.wait_for_distribution, operation_id, **correlation)141 .then(route53.create_ALIAS_records, operation_id, **correlation)142 .then(route53.wait_for_changes, operation_id, **correlation)143 .then(iam.delete_previous_server_certificate, operation_id, **correlation)144 .then(update_operations.update_complete, operation_id, **correlation)145 )146 huey.enqueue(task_pipeline)147def queue_all_alb_update_tasks_for_operation(operation_id, correlation_id):148 correlation = {"correlation_id": correlation_id}149 task_pipeline = (150 letsencrypt.generate_private_key.s(operation_id, **correlation)151 .then(letsencrypt.initiate_challenges, operation_id, **correlation)152 .then(route53.create_TXT_records, operation_id, **correlation)153 .then(route53.wait_for_changes, operation_id, **correlation)154 .then(letsencrypt.answer_challenges, operation_id, **correlation)155 .then(letsencrypt.retrieve_certificate, operation_id, **correlation)156 .then(iam.upload_server_certificate, operation_id, **correlation)157 .then(alb.select_alb, operation_id, **correlation)158 .then(alb.add_certificate_to_alb, operation_id, **correlation)159 .then(route53.create_ALIAS_records, operation_id, **correlation)160 .then(route53.wait_for_changes, operation_id, **correlation)161 .then(alb.remove_certificate_from_previous_alb, operation_id, **correlation)162 .then(iam.delete_previous_server_certificate, operation_id, **correlation)163 .then(update_operations.provision, operation_id, **correlation)164 )165 huey.enqueue(task_pipeline)166def queue_all_cdn_broker_migration_tasks_for_operation(operation_id, correlation_id):167 correlation = {"correlation_id": correlation_id}168 task_pipeline = (169 cloudfront.remove_s3_bucket_from_cdn_broker_instance.s(170 operation_id, **correlation171 )172 .then(cloudfront.add_logging_to_bucket, operation_id, **correlation)173 .then(letsencrypt.create_user, operation_id, **correlation)174 .then(letsencrypt.generate_private_key, operation_id, **correlation)175 .then(letsencrypt.initiate_challenges, operation_id, **correlation)176 .then(route53.create_ALIAS_records, operation_id, **correlation)177 .then(route53.wait_for_changes, operation_id, **correlation)178 .then(route53.create_TXT_records, operation_id, **correlation)179 .then(route53.wait_for_changes, operation_id, **correlation)180 .then(letsencrypt.answer_challenges, operation_id, **correlation)181 .then(letsencrypt.retrieve_certificate, operation_id, **correlation)182 .then(iam.upload_server_certificate, operation_id, **correlation)183 .then(cloudfront.update_certificate, operation_id, **correlation)184 .then(iam.delete_previous_server_certificate, operation_id, **correlation)185 .then(update_operations.provision, operation_id, **correlation)186 )187 huey.enqueue(task_pipeline)188def queue_all_domain_broker_migration_tasks_for_operation(operation_id, correlation_id):189 correlation = {"correlation_id": correlation_id}190 task_pipeline = (191 letsencrypt.create_user.s(operation_id, **correlation)192 .then(letsencrypt.generate_private_key, operation_id, **correlation)193 .then(letsencrypt.initiate_challenges, operation_id, **correlation)194 # create alias records here is probably not necessary, but belt + suspenders195 .then(route53.create_ALIAS_records, operation_id, **correlation)196 .then(route53.wait_for_changes, operation_id, **correlation)197 .then(route53.create_TXT_records, operation_id, **correlation)198 .then(route53.wait_for_changes, operation_id, **correlation)199 .then(letsencrypt.answer_challenges, operation_id, **correlation)200 .then(letsencrypt.retrieve_certificate, operation_id, **correlation)201 .then(iam.upload_server_certificate, operation_id, **correlation)202 .then(alb.select_alb, operation_id, **correlation)203 .then(alb.add_certificate_to_alb, operation_id, **correlation)204 .then(route53.create_ALIAS_records, operation_id, **correlation)205 .then(route53.wait_for_changes, operation_id, **correlation)206 .then(alb.remove_certificate_from_previous_alb, operation_id, **correlation)207 .then(iam.delete_previous_server_certificate, operation_id, **correlation)208 .then(update_operations.provision, operation_id, **correlation)209 )...
test_iam_idempotent.py
Source:test_iam_idempotent.py
...76 service_instance = CDNServiceInstance.query.get("1234")77 certificate = service_instance.new_certificate78 today = date.today().isoformat()79 assert today == simple_regex(r"^\d\d\d\d-\d\d-\d\d$")80 iam_commercial.expect_upload_server_certificate(81 name=f"{service_instance.id}-{today}-{certificate.id}",82 cert=certificate.leaf_pem,83 private_key=certificate.private_key_pem,84 chain=certificate.fullchain_pem,85 path="/cloudfront/external-domains-test/",86 )87 upload_server_certificate.call_local("4321")88 db.session.expunge_all()89 service_instance = CDNServiceInstance.query.get("1234")90 certificate = service_instance.new_certificate91 operation = Operation.query.get("4321")92 updated_at = operation.updated_at.timestamp()93 # unstubbed, so an error should be raised if we do try94 upload_server_certificate.call_local("4321")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!