Best Python code snippet using localstack_python
test_dynamodb.py
Source:test_dynamodb.py
...143 resources = p.run()144 self.assertEqual(len(resources), 1)145 client = session_factory().client('dynamodb')146 arn = resources[0]['c7n:BackupArn']147 table = client.describe_backup(148 BackupArn=arn)149 self.assertEqual(table['BackupDescription']['BackupDetails']['BackupName'],150 'Backup-c7n-dynamodb-backup-%s' % (suffix))151 def test_dynamodb_create_prefixed_backup(self):152 dt = datetime.datetime.now().replace(153 year=2018, month=1, day=22, hour=13, minute=42)154 suffix = dt.strftime('%Y-%m-%d-%H-%M')155 session_factory = self.replay_flight_data(156 'test_dynamodb_create_prefixed_backup')157 p = self.load_policy({158 'name': 'c7n-dynamodb-create-prefixed-backup',159 'resource': 'dynamodb-table',160 'filters': [{'TableName': 'c7n-dynamodb-backup'}],161 'actions': [{162 'type': 'backup',163 'prefix': 'custom'}]164 },165 session_factory=session_factory)166 resources = p.run()167 self.assertEqual(len(resources), 1)168 client = session_factory().client('dynamodb')169 arn = resources[0]['c7n:BackupArn']170 table = client.describe_backup(171 BackupArn=arn)172 self.assertEqual(table['BackupDescription']['BackupDetails']['BackupName'],173 'custom-c7n-dynamodb-backup-%s' % (suffix))174 def test_dynamodb_delete_backup(self):175 factory = self.replay_flight_data('test_dynamodb_delete_backup')176 p = self.load_policy({177 'name': 'c7n-dynamodb-delete-backup',178 'resource': 'dynamodb-backup',179 'filters': [{'TableName': 'c7n-dynamodb-backup'}],180 'actions': ['delete']},181 session_factory=factory)182 resources = p.run()183 self.assertEqual(len(resources), 1)184 def test_dynamodb_enable_stream(self):...
redis_backup.py
Source:redis_backup.py
...3233pct = ProcessControlThread(download_file_ex, config.backup_process_num, config.backup_thread_num, arg_type="dict", is_join=False, is_retry=True )343536def describe_backup(backup_date, instance, filename, max_try=5):37 38 i=039 while i<max_try:40 i += 141 try:42 all_instances = do.get_all_instances_info(project_ids=[]) 43 break44 except:45 #è·åå®ä¾ä¿¡æ¯é误ï¼ä¼ç ä¸æ®µæ¶é´åå次å°è¯46 logger_err.warning(format_exc())47 48 if i >= max_try:49 logger_err.error("get instances info failed")50 exit(1)51 else:52 time.sleep(60)53 54 for region, region_instance_list in all_instances: 55 for l in region_instance_list: 56 if instance and instance not in [l.InstanceId, l.InstanceName]:57 continue58 59 instance_id = l.InstanceId60 instance_name = l.InstanceName61 backup_name = None62 63 if instance_id in config.backup_exclude_instance:64 continue65 66 try:67 backup_info = do.describe_backups(region, instance_id, backup_date)68 backup_id = backup_info.BackupId69 backup_time = backup_info.StartTime70 if backup_id: 71 _filename = filename if filename else "%s_%s.tar" % (instance_name, backup_time.replace("-","").replace(" ","").replace(":","") )72 download_url = None73 timeout = 60074 init_info = {"url":download_url, "path":config.redis_backup_dir, "filename":_filename, "download_timeout":timeout,\75 "region":region, "instance_id":instance_id, "backup_id":backup_id}76 retry_queue.put(init_info)77 78 else:79 raise Exception("null for backup_id") 80 81 except:82 logger_err.error(format_exc())83 logger_err.error("can not get backup name for %s %s %s" % (region, instance_id, backup_date))848586def set_download_queue(timeout = 120):87 """88 设置ä¸è½½éå89 """90 while True:91 try:92 retry_info = retry_queue.get(block=True, timeout=timeout)93 region = retry_info["region"]94 instance_id = retry_info["instance_id"]95 backup_id = retry_info["backup_id"]96 97 # é²æ¢å 积å¨ä¸è½½éå导è´urlè¿æ98 while download_url_queue.qsize():99 time.sleep(0.1)100 101 try:102 download_url = do.describe_backup_url(region, instance_id, backup_id)103 except:104 logger_err.error(format_exc())105 download_url = None106 107 if not download_url:108 retry_queue.put(retry_info)109 continue110 111 retry_info["url"] = download_url112 download_url_queue.put(retry_info)113 logger.debug("put download info %s" % str(retry_info))114 except Empty:115 if pct.tsize():116 # åå°å¹¶åè¿åå¨ï¼å æ¤ç»§ç»117 timeout = 120118 continue119 else:120 # éå为空ï¼è®¤ä¸ºæ²¡æéè¯çä»»å¡ï¼å æ¤ç´æ¥ç»æå³å¯121 logger.debug("get retry queue timeout, seems good to end")122 break 123 124 except:125 logger_err.error(format_exc())126 break127 128 # 设置ç»ææ è¯ç¬¦129 for i in range(config.backup_process_num): 130 download_url_queue.put("EOF") 131 132133def main(backup_date, instance, filename):134 135 describe_backup(backup_date, instance, filename) 136 137 #设置ä¸è½½éåï¼éè¦å¦å¤å¼ä¸ä¸ªè¿ç¨æ§è¡138 t = Thread(target = set_download_queue, args = ())139 t.start() 140 141 pct.start()142 143 t.join()144 pct.join()145146147def arg_parse():148 """149 å½ä»¤è¡åæ°è§£æ
...
app_management.py
Source:app_management.py
1# Copyright 2013 Mirantis Inc.2# All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15import routes16from magnetodb.api import with_global_env17from magnetodb.common import wsgi18from magnetodb.api.openstack.v1 import create_resource19from magnetodb.api.openstack.v1.management import create_backup20from magnetodb.api.openstack.v1.management import delete_backup21from magnetodb.api.openstack.v1.management import describe_backup22from magnetodb.api.openstack.v1.management import list_backups23from magnetodb.api.openstack.v1.management import create_restore_job24from magnetodb.api.openstack.v1.management import describe_restore_job25from magnetodb.api.openstack.v1.management import list_restore_jobs26class ManagementApplication(wsgi.Router):27 """API"""28 def __init__(self):29 mapper = routes.Mapper()30 super(ManagementApplication, self).__init__(mapper)31 mapper.connect(32 "create_backup",33 "/{project_id}/{table_name}/backups",34 conditions={'method': 'POST'},35 controller=create_resource(36 create_backup.CreateBackupController()),37 action="process_request"38 )39 mapper.connect(40 "list_backups",41 "/{project_id}/{table_name}/backups",42 conditions={'method': 'GET'},43 controller=create_resource(44 list_backups.ListBackupsController()),45 action="process_request"46 )47 mapper.connect(48 "describe_backup",49 "/{project_id}/{table_name}/backups/{backup_id}",50 conditions={'method': 'GET'},51 controller=create_resource(52 describe_backup.DescribeBackupController()),53 action="process_request"54 )55 mapper.connect(56 "delete_backup",57 "/{project_id}/{table_name}/backups/{backup_id}",58 conditions={'method': 'DELETE'},59 controller=create_resource(60 delete_backup.DeleteBackupController()),61 action="process_request"62 )63 mapper.connect(64 "create_restore_job",65 "/{project_id}/{table_name}/restores",66 conditions={'method': 'POST'},67 controller=create_resource(68 create_restore_job.CreateRestoreJobController()),69 action="process_request"70 )71 mapper.connect(72 "list_restore_jobs",73 "/{project_id}/{table_name}/restores",74 conditions={'method': 'GET'},75 controller=create_resource(76 list_restore_jobs.ListRestoreJobsController()),77 action="process_request"78 )79 mapper.connect(80 "describe_restore_job",81 "/{project_id}/{table_name}/restores/{restore_job_id}",82 conditions={'method': 'GET'},83 controller=create_resource(84 describe_restore_job.DescribeRestoreJobController()),85 action="process_request"86 )87@with_global_env(default_program='management-api')88def app_factory(global_conf, **local_conf):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!