Best Python code snippet using localstack_python
applications.py
Source:applications.py
...219 220 if client_name != 'q':221 res = input('\nAre you sure you want to delete '+client_name+' \'s certificate ? y/n\n')222 if res =='y':223 task = loop.create_task(connector_remote_tool.delete_client_certificate(client_id=client_name, 224 remove_only_symlink=False))225 loop.run_until_complete(task)226 print(task.result().decode()) 227 task = loop.create_task(connector_remote_tool.disconnect_client(client_id=client_name))228 loop.run_until_complete(task)229 print(task.result().decode()) 230 else:231 res = input('\nAre you sure you want to delete '+client_name+' \'s certificate ? y/n\n')232 if res =='y': 233 task = loop.create_task(connector_remote_tool.delete_client_certificate())234 loop.run_until_complete(task)235 print(task.result().decode())236 elif the_cmd == 'delete_client_token':237 if is_server:238 peers_dict = show_connected_peers(return_peers=running_with_tab_completion)239 if running_with_tab_completion:240 def complete(text,state):241 results = [peer for peer in peers_dict if peer.startswith(text)] + [None]242 return results[state]243 readline.set_completer(complete) 244 245 client_name = input('\nPlease type the client name whose token you would '246 'like to delete, or q to quit\n')247 if running_with_tab_completion:...
aioconnectors_test.py
Source:aioconnectors_test.py
...181 uds_path_send_preserve_socket=UDS_PATH_SEND_PRESERVE_SOCKET)#, uds_path_receive_preserve_socket=UDS_PATH_RECEIVE_PRESERVE_SOCKET)182 loop = asyncio.get_event_loop()183 184 if TEST_COMMANDER_CLIENT:185 loop.create_task(connector_api.delete_client_certificate())186 187 async def client_cb_type1(logger, transport_json , data, binary):188 peer_id = transport_json['source_id'] 189 increment_result(own_source_id, peer_id, 'type1', 'recv')190 191 async def client_cb_type2(logger, transport_json , data, binary):192 peer_id = transport_json['source_id'] 193 increment_result(own_source_id, peer_id, 'type2', 'recv')194 195 loop.create_task(print_results())196 #wait for messages from server (call once only) 197 if True: #TEST_PERSISTENCE_CLIENT or TEST_PERSISTENCE_SERVER or TEST_SERVER_AWAITS_REPLY or TEST_CLIENT_AWAITS_REPLY or TEST_UPLOAD_FILE:198 loop.create_task(connector_api.start_waiting_for_messages(message_type='type2', message_received_cb=client_cb_type2))199 loop.create_task(connector_api.start_waiting_for_messages(message_type='type1', message_received_cb=client_cb_type1))200 201 async def send_stress(message_type, peer_id, delay):202 await asyncio.sleep(delay) 203 index = 0 204 await_response = False 205 with_file_template = False206 with_file = None207 208 if TEST_PERSISTENCE_CLIENT: 209 duration_test = 20 #seconds210 messages_per_second = 1000 211 if TEST_WITH_ACK: 212 messages_per_second = 10213 elif (TEST_SERVER_AWAITS_REPLY or TEST_CLIENT_AWAITS_REPLY):214 duration_test = 10 #seconds215 messages_per_second = 10216 if TEST_CLIENT_AWAITS_REPLY:217 await_response = True218 elif TEST_UPLOAD_FILE or TEST_UPLOAD_FILE_WITH_PERSISTENCE:219 duration_test = 10 #seconds220 if TEST_UPLOAD_FILE_WITH_PERSISTENCE:221 duration_test = 20222 messages_per_second = 1223 with_file_template={'src_path':FILE_SRC_PATH,'dst_type':'file1', 'dst_name':os.path.basename(FILE_SRC_PATH)+'_from_client_'+own_source_id+'_index_{}', 'delete':False} #default is delete=True224 elif TEST_TRAFFIC_CLIENT:225 duration_test = 15 #seconds226 messages_per_second = 1000 #10000227 if TEST_WITH_ACK:228 messages_per_second = 10229 elif TEST_PERSISTENCE_CLIENT_AWAIT_REPLY:230 duration_test = 20 #seconds231 messages_per_second = 2232 await_response = True233 234 max_index = duration_test * messages_per_second235 sleep_time = 1/messages_per_second236 request_id = response_id = None237 while index < max_index:238 index += 1239 data = '×ס×(% ;)'+str(index)*200 240 #increment_result(own_source_id, peer_id, message_type, 'send')241 #while results[own_source_id][message_type][peer_id]['recv'] != index:242 # await asyncio.sleep(0.1)243 if TEST_SERVER_AWAITS_REPLY:244 response_id = index245 else:246 request_id = index247 if with_file_template:248 with_file = deepcopy(with_file_template)249 with_file['dst_name'] = with_file['dst_name'].format(index)250 response = await connector_api.send_message(data=data, data_is_json=False, 251 message_type=message_type, await_response=await_response, response_id=response_id, request_id=request_id,252 binary=b'\x01\x02\x03\x04\x05', with_file=with_file, wait_for_ack=TEST_WITH_ACK)253 increment_result(own_source_id, peer_id, message_type, 'send') 254 #await asyncio.sleep(sleep_time)255 256 if TEST_CLIENT_AWAITS_REPLY or TEST_PERSISTENCE_CLIENT_AWAIT_REPLY:257 increment_result(own_source_id, peer_id, message_type, 'recv')258 else: 259 await asyncio.sleep(sleep_time) 260 261 print('Finished')262 263 264 if TEST_PERSISTENCE_CLIENT or TEST_PERSISTENCE_CLIENT_AWAIT_REPLY: 265 loop.create_task(send_stress(message_type='type1', peer_id=str(SERVER_SOCKADDR), delay=2))266 loop.create_task(send_stress(message_type='type2', peer_id=str(SERVER_SOCKADDR), delay=2)) 267 elif TEST_SERVER_AWAITS_REPLY: 268 loop.create_task(send_stress(message_type='type2', peer_id=str(SERVER_SOCKADDR), delay=7))269 elif TEST_CLIENT_AWAITS_REPLY:270 loop.create_task(send_stress(message_type='type2', peer_id=str(SERVER_SOCKADDR), delay=3))271 elif TEST_UPLOAD_FILE or TEST_UPLOAD_FILE_WITH_PERSISTENCE:272 loop.create_task(send_stress(message_type='type1', peer_id=str(SERVER_SOCKADDR), delay=3))273 elif TEST_TRAFFIC_CLIENT:274 loop.create_task(send_stress(message_type='type1', peer_id=str(SERVER_SOCKADDR), delay=2))275 loop.create_task(send_stress(message_type='type2', peer_id=str(SERVER_SOCKADDR), delay=2))276 277 try:278 loop.run_forever()279 except:280 print('send2client stopped !')281 connector_api.stop_waiting_for_messages(message_type='type2')282 connector_api.stop_waiting_for_messages(message_type='type1') 283 #for task in tasks:284 # task.cancel()285 286 elif sys.argv[1] == 'send2server':287 print('Started send2server') 288 own_source_id = str(SERVER_SOCKADDR) 289 #name = local_name or str(SERVER_SOCKADDR)290 #server_source_id = str(SERVER_SOCKADDR)291 292 connector_api = aioconnectors.ConnectorAPI(default_logger_log_level=DEFAULT_LOGGER_LOG_LEVEL, is_server=True, server_sockaddr=SERVER_SOCKADDR,293 send_message_types=SERVER_MESSAGE_TYPES, recv_message_types=CLIENT_MESSAGE_TYPES,294 uds_path_send_preserve_socket=UDS_PATH_SEND_PRESERVE_SOCKET)#, uds_path_receive_preserve_socket=UDS_PATH_RECEIVE_PRESERVE_SOCKET)295# connector_api = aioconnectors.ConnectorAPI(is_server=True, server_sockaddr=own_source_id, use_ssl=TEST_WITH_SSL, certificates_directory_path=CERTIFICATES_DIRECTORY_PATH,296 297 loop = asyncio.get_event_loop()298 if TEST_COMMANDER_SERVER:299 loop.create_task(connector_api.delete_client_certificate(client_id='client2', remove_only_symlink=False))300 ''' 301 async def print_queues(period):302 while True:303 res = await connector_api.peek_queues()304 await asyncio.sleep(period)305 print(res) 306 loop.create_task(print_queues(3))307 '''308 309 async def server_cb_type1(logger, transport_json , data, binary):310 peer_id = transport_json['source_id'] 311 increment_result(own_source_id, peer_id, 'type1', 'recv')312 313 async def server_cb_type2(logger, transport_json , data, binary):...
test_generator.py
Source:test_generator.py
1from pyboto3 import interface_generator2import json3def test_get_services_dir():4 assert interface_generator.get_services_dir()5 service_json_path_dict = dict(interface_generator.iter_services_json_paths())6 for service_name, json_path in service_json_path_dict.iteritems():7 print service_name, json_path8 apigateway_methods = {'can_paginate', 'create_api_key', 'create_authorizer', 'create_base_path_mapping',9 'create_deployment',10 'create_domain_name', 'create_model', 'create_resource', 'create_rest_api', 'create_stage',11 'delete_api_key', 'delete_authorizer', 'delete_base_path_mapping',12 'delete_client_certificate',13 'delete_deployment', 'delete_domain_name', 'delete_integration',14 'delete_integration_response',15 'delete_method', 'delete_method_response', 'delete_model', 'delete_resource',16 'delete_rest_api',17 'delete_stage', 'flush_stage_authorizers_cache', 'flush_stage_cache',18 'generate_client_certificate',19 'generate_presigned_url', 'get_account', 'get_api_key', 'get_api_keys', 'get_authorizer',20 'get_authorizers', 'get_base_path_mapping', 'get_base_path_mappings',21 'get_client_certificate',22 'get_client_certificates', 'get_deployment', 'get_deployments', 'get_domain_name',23 'get_domain_names',24 'get_export', 'get_integration', 'get_integration_response', 'get_method',25 'get_method_response',26 'get_model', 'get_model_template', 'get_models', 'get_paginator', 'get_resource',27 'get_resources',28 'get_rest_api', 'get_rest_apis', 'get_sdk', 'get_stage', 'get_stages', 'get_waiter',29 'import_rest_api',30 'put_integration', 'put_integration_response', 'put_method', 'put_method_response',31 'put_rest_api',32 'test_invoke_authorizer', 'test_invoke_method', 'update_account', 'update_api_key',33 'update_authorizer',34 'update_base_path_mapping', 'update_client_certificate', 'update_deployment',35 'update_domain_name',36 'update_integration', 'update_integration_response', 'update_method',37 'update_method_response',38 'update_model', 'update_resource', 'update_rest_api', 'update_stage'}39 apigateway_dict = json.load(open(service_json_path_dict.get('apigateway')))40 assert set(interface_generator.iter_method_names(apigateway_dict)) == apigateway_methods41 s3_methods = {'abort_multipart_upload', 'can_paginate', 'complete_multipart_upload', 'copy_object',42 'create_bucket',43 'create_multipart_upload', 'delete_bucket', 'delete_bucket_cors', 'delete_bucket_lifecycle',44 'delete_bucket_policy', 'delete_bucket_replication', 'delete_bucket_tagging',45 'delete_bucket_website',46 'delete_object', 'delete_objects', 'download_file', 'generate_presigned_post',47 'generate_presigned_url', 'get_bucket_accelerate_configuration', 'get_bucket_acl',48 'get_bucket_cors',49 'get_bucket_lifecycle', 'get_bucket_lifecycle_configuration', 'get_bucket_location',50 'get_bucket_logging', 'get_bucket_notification', 'get_bucket_notification_configuration',51 'get_bucket_policy', 'get_bucket_replication', 'get_bucket_request_payment', 'get_bucket_tagging',52 'get_bucket_versioning', 'get_bucket_website', 'get_object', 'get_object_acl',53 'get_object_torrent',54 'get_paginator', 'get_waiter', 'head_bucket', 'head_object', 'list_buckets',55 'list_multipart_uploads',56 'list_object_versions', 'list_objects', 'list_objects_v2', 'list_parts',57 'put_bucket_accelerate_configuration', 'put_bucket_acl', 'put_bucket_cors',58 'put_bucket_lifecycle',59 'put_bucket_lifecycle_configuration', 'put_bucket_logging', 'put_bucket_notification',60 'put_bucket_notification_configuration', 'put_bucket_policy', 'put_bucket_replication',61 'put_bucket_request_payment', 'put_bucket_tagging', 'put_bucket_versioning', 'put_bucket_website',62 'put_object', 'put_object_acl', 'restore_object', 'upload_file', 'upload_part',63 'upload_part_copy'}64 s3_dict = json.load(open(service_json_path_dict.get('s3')))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!