How to use _update_status method in avocado

Best Python code snippet using avocado_python

test_agent_driver_base.py

Source:test_agent_driver_base.py Github

copy

Full Screen

...101 self.mock_get_driver = mock.patch.object(self.plugin_instance,102 '_get_driver')103 self.mock_get_driver.return_value = (104 agent_driver_base.AgentDriverBase(self.plugin_instance))105 def _update_status(self, model, status, id):106 ctx = context.get_admin_context()107 self.plugin_instance.db.update_status(108 ctx,109 model,110 id,111 provisioning_status=status112 )113 def test_create_loadbalancer(self):114 with self.loadbalancer(no_delete=True) as loadbalancer:115 calls = self.mock_api.create_loadbalancer.call_args_list116 self.assertEqual(1, len(calls))117 _, called_lb, _, device_driver = calls[0][0]118 self.assertEqual(loadbalancer['loadbalancer']['id'], called_lb.id)119 self.assertEqual('dummy', device_driver)120 self.assertEqual(constants.PENDING_CREATE,121 called_lb.provisioning_status)122 def test_update_loadbalancer(self):123 with self.loadbalancer(no_delete=True) as loadbalancer:124 lb_id = loadbalancer['loadbalancer']['id']125 old_lb_name = loadbalancer['loadbalancer']['name']126 ctx = context.get_admin_context()127 self.plugin_instance.db.update_loadbalancer_provisioning_status(128 ctx,129 loadbalancer['loadbalancer']['id'])130 new_lb_name = 'new_lb_name'131 loadbalancer['loadbalancer']['name'] = new_lb_name132 self._update_loadbalancer_api(133 lb_id, {'loadbalancer': {'name': new_lb_name}})134 calls = self.mock_api.update_loadbalancer.call_args_list135 self.assertEqual(1, len(calls))136 _, called_old_lb, called_new_lb, called_host = calls[0][0]137 self.assertEqual(lb_id, called_old_lb.id)138 self.assertEqual(lb_id, called_new_lb.id)139 self.assertEqual(old_lb_name, called_old_lb.name)140 self.assertEqual(new_lb_name, called_new_lb.name)141 self.assertEqual('host', called_host)142 self.assertEqual(constants.PENDING_UPDATE,143 called_new_lb.provisioning_status)144 def test_delete_loadbalancer(self):145 with self.loadbalancer(no_delete=True) as loadbalancer:146 lb_id = loadbalancer['loadbalancer']['id']147 ctx = context.get_admin_context()148 self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)149 self.plugin_instance.delete_loadbalancer(ctx, lb_id)150 calls = self.mock_api.delete_loadbalancer.call_args_list151 self.assertEqual(1, len(calls))152 _, called_lb, called_host = calls[0][0]153 self.assertEqual(lb_id, called_lb.id)154 self.assertEqual('host', called_host)155 self.assertEqual(constants.PENDING_DELETE,156 called_lb.provisioning_status)157 self.assertRaises(loadbalancerv2.EntityNotFound,158 self.plugin_instance.db.get_loadbalancer,159 ctx, lb_id)160 def test_create_listener(self):161 with self.loadbalancer(no_delete=True) as loadbalancer:162 lb_id = loadbalancer['loadbalancer']['id']163 self._update_status(models.LoadBalancer, constants.ACTIVE,164 loadbalancer['loadbalancer']['id'])165 with self.listener(loadbalancer_id=lb_id,166 no_delete=True) as listener:167 listener_id = listener['listener']['id']168 calls = self.mock_api.create_listener.call_args_list169 _, called_listener, called_host = calls[0][0]170 self.assertEqual(listener_id, called_listener.id)171 self.assertEqual('host', called_host)172 self.assertEqual(constants.PENDING_CREATE,173 called_listener.provisioning_status)174 ctx = context.get_admin_context()175 lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)176 self.assertEqual(constants.PENDING_UPDATE,177 lb.provisioning_status)178 def test_update_listener(self):179 with self.loadbalancer(no_delete=True) as loadbalancer:180 lb_id = loadbalancer['loadbalancer']['id']181 self._update_status(models.LoadBalancer, constants.ACTIVE,182 loadbalancer['loadbalancer']['id'])183 with self.listener(loadbalancer_id=lb_id,184 no_delete=True) as listener:185 listener_id = listener['listener']['id']186 old_name = listener['listener']['name']187 ctx = context.get_admin_context()188 self._update_status(models.LoadBalancer, constants.ACTIVE,189 lb_id)190 self.plugin_instance.db.get_listener(ctx, listener_id)191 new_name = 'new_listener_name'192 listener['listener']['name'] = new_name193 self.plugin_instance.update_listener(194 ctx, listener['listener']['id'], listener)195 self.plugin_instance.db.get_listener(196 ctx, listener['listener']['id'])197 calls = self.mock_api.update_listener.call_args_list198 (_, old_called_listener,199 new_called_listener, called_host) = calls[0][0]200 self.assertEqual(listener_id, new_called_listener.id)201 self.assertEqual(listener_id, old_called_listener.id)202 self.assertEqual(old_name, old_called_listener.name)203 self.assertEqual(new_name, new_called_listener.name)204 self.assertEqual(constants.PENDING_UPDATE,205 new_called_listener.provisioning_status)206 lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)207 self.assertEqual(constants.PENDING_UPDATE,208 lb.provisioning_status)209 self.assertEqual('host', called_host)210 def test_delete_listener(self):211 with self.loadbalancer(no_delete=True) as loadbalancer:212 lb_id = loadbalancer['loadbalancer']['id']213 self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)214 with self.listener(loadbalancer_id=lb_id,215 no_delete=True) as listener:216 listener_id = listener['listener']['id']217 self._update_status(models.LoadBalancer, constants.ACTIVE,218 lb_id)219 ctx = context.get_admin_context()220 self.plugin_instance.delete_listener(221 ctx, listener['listener']['id'])222 calls = self.mock_api.delete_listener.call_args_list223 _, called_listener, called_host = calls[0][0]224 self.assertEqual(listener_id, called_listener.id)225 self.assertEqual('host', called_host)226 self.assertEqual(constants.PENDING_DELETE,227 called_listener.provisioning_status)228 ctx = context.get_admin_context()229 lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)230 self.assertEqual(constants.ACTIVE,231 lb.provisioning_status)232 self.assertRaises(233 loadbalancerv2.EntityNotFound,234 self.plugin_instance.db.get_listener, ctx, listener_id)235 def test_create_pool(self):236 with self.loadbalancer(no_delete=True) as loadbalancer:237 lb_id = loadbalancer['loadbalancer']['id']238 self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)239 with self.listener(loadbalancer_id=lb_id,240 no_delete=True) as listener:241 listener_id = listener['listener']['id']242 self._update_status(models.LoadBalancer, constants.ACTIVE,243 lb_id)244 with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,245 no_delete=True) as pool:246 pool_id = pool['pool']['id']247 calls = self.mock_api.create_pool.call_args_list248 _, called_pool, called_host = calls[0][0]249 self.assertEqual(pool_id, called_pool.id)250 self.assertEqual('host', called_host)251 self.assertEqual(constants.PENDING_CREATE,252 called_pool.provisioning_status)253 ctx = context.get_admin_context()254 lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)255 self.assertEqual(constants.PENDING_UPDATE,256 lb.provisioning_status)257 def test_update_pool(self):258 ctx = context.get_admin_context()259 with self.loadbalancer(no_delete=True) as loadbalancer:260 lb_id = loadbalancer['loadbalancer']['id']261 self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)262 with self.listener(loadbalancer_id=lb_id,263 no_delete=True) as listener:264 listener_id = listener['listener']['id']265 self._update_status(models.LoadBalancer, constants.ACTIVE,266 lb_id)267 with self.pool(loadbalancer_id=lb_id, listener_id=listener_id,268 no_delete=True) as pool:269 pool_id = pool['pool']['id']270 old_name = pool['pool']['name']271 self._update_status(models.LoadBalancer, constants.ACTIVE,272 lb_id)273 new_name = 'new_name'274 pool['pool']['name'] = new_name275 self.plugin_instance.update_pool(ctx, pool_id, pool)276 calls = self.mock_api.update_pool.call_args_list277 (_, old_called_pool,278 new_called_pool, called_host) = calls[0][0]279 self.assertEqual(pool_id, new_called_pool.id)280 self.assertEqual(pool_id, old_called_pool.id)281 self.assertEqual(old_name, old_called_pool.name)282 self.assertEqual(new_name, new_called_pool.name)283 self.assertEqual(constants.PENDING_UPDATE,284 new_called_pool.provisioning_status)285 lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)286 self.assertEqual(constants.PENDING_UPDATE,287 lb.provisioning_status)288 self.assertEqual('host', called_host)289 def test_delete_pool(self):290 with self.loadbalancer(no_delete=True) as loadbalancer:291 lb_id = loadbalancer['loadbalancer']['id']292 self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)293 with self.listener(loadbalancer_id=lb_id,294 no_delete=True) as listener:295 listener_id = listener['listener']['id']296 self._update_status(models.LoadBalancer, constants.ACTIVE,297 lb_id)298 with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,299 no_delete=True) as pool:300 pool_id = pool['pool']['id']301 self._update_status(models.LoadBalancer, constants.ACTIVE,302 lb_id)303 ctx = context.get_admin_context()304 self.plugin_instance.delete_pool(ctx, pool_id)305 calls = self.mock_api.delete_pool.call_args_list306 _, called_pool, called_host = calls[0][0]307 self.assertEqual(pool_id, called_pool.id)308 self.assertEqual('host', called_host)309 self.assertEqual(constants.PENDING_DELETE,310 called_pool.provisioning_status)311 lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)312 self.assertEqual(constants.ACTIVE,313 lb.provisioning_status)314 self.assertRaises(315 loadbalancerv2.EntityNotFound,316 self.plugin_instance.db.get_pool, ctx, pool_id)317 def test_create_member(self):318 with self.loadbalancer(no_delete=True) as loadbalancer:319 lb_id = loadbalancer['loadbalancer']['id']320 self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)321 with self.listener(loadbalancer_id=lb_id,322 no_delete=True) as listener:323 listener_id = listener['listener']['id']324 self._update_status(models.LoadBalancer, constants.ACTIVE,325 lb_id)326 with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,327 no_delete=True) as pool:328 pool_id = pool['pool']['id']329 self._update_status(models.LoadBalancer, constants.ACTIVE,330 lb_id)331 with self.subnet(cidr='11.0.0.0/24') as subnet:332 with self.member(pool_id=pool_id, subnet=subnet,333 no_delete=True) as member:334 member_id = member['member']['id']335 calls = self.mock_api.create_member.call_args_list336 _, called_member, called_host = calls[0][0]337 self.assertEqual(member_id, called_member.id)338 self.assertEqual('host', called_host)339 self.assertEqual(constants.PENDING_CREATE,340 called_member.provisioning_status)341 ctx = context.get_admin_context()342 lb = self.plugin_instance.db.get_loadbalancer(343 ctx, lb_id)344 self.assertEqual(constants.PENDING_UPDATE,345 lb.provisioning_status)346 def test_update_member(self):347 with self.loadbalancer(no_delete=True) as loadbalancer:348 lb_id = loadbalancer['loadbalancer']['id']349 self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)350 with self.listener(loadbalancer_id=lb_id,351 no_delete=True) as listener:352 listener_id = listener['listener']['id']353 self._update_status(models.LoadBalancer, constants.ACTIVE,354 lb_id)355 with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,356 no_delete=True) as pool:357 pool_id = pool['pool']['id']358 self._update_status(models.LoadBalancer, constants.ACTIVE,359 lb_id)360 with self.subnet(cidr='11.0.0.0/24') as subnet:361 with self.member(pool_id=pool_id, subnet=subnet,362 no_delete=True) as member:363 member_id = member['member']['id']364 self._update_status(models.LoadBalancer,365 constants.ACTIVE, lb_id)366 old_weight = member['member']['weight']367 new_weight = 2368 member['member']['weight'] = new_weight369 ctx = context.get_admin_context()370 self.plugin_instance.update_pool_member(371 ctx, member_id, pool_id, member)372 calls = self.mock_api.update_member.call_args_list373 (_, old_called_member,374 new_called_member, called_host) = calls[0][0]375 self.assertEqual(member_id, new_called_member.id)376 self.assertEqual(member_id, old_called_member.id)377 self.assertEqual(old_weight,378 old_called_member.weight)379 self.assertEqual(new_weight,380 new_called_member.weight)381 self.assertEqual(382 constants.PENDING_UPDATE,383 new_called_member.provisioning_status)384 lb = self.plugin_instance.db.get_loadbalancer(385 ctx, lb_id)386 self.assertEqual(constants.PENDING_UPDATE,387 lb.provisioning_status)388 self.assertEqual('host', called_host)389 def test_delete_member(self):390 with self.loadbalancer(no_delete=True) as loadbalancer:391 lb_id = loadbalancer['loadbalancer']['id']392 self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)393 with self.listener(loadbalancer_id=lb_id,394 no_delete=True) as listener:395 listener_id = listener['listener']['id']396 self._update_status(models.LoadBalancer, constants.ACTIVE,397 lb_id)398 with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,399 no_delete=True) as pool:400 pool_id = pool['pool']['id']401 self._update_status(models.LoadBalancer, constants.ACTIVE,402 lb_id)403 with self.subnet(cidr='11.0.0.0/24') as subnet:404 with self.member(pool_id=pool_id, subnet=subnet,405 no_delete=True) as member:406 member_id = member['member']['id']407 self._update_status(models.LoadBalancer,408 constants.ACTIVE, lb_id)409 ctx = context.get_admin_context()410 self.plugin_instance.delete_pool_member(411 ctx, member_id, pool_id)412 calls = self.mock_api.delete_member.call_args_list413 _, called_member, called_host = calls[0][0]414 self.assertEqual(member_id, called_member.id)415 self.assertEqual('host', called_host)416 self.assertEqual(constants.PENDING_DELETE,417 called_member.provisioning_status)418 lb = self.plugin_instance.db.get_loadbalancer(419 ctx, lb_id)420 self.assertEqual(constants.ACTIVE,421 lb.provisioning_status)422 self.assertRaises(423 loadbalancerv2.EntityNotFound,424 self.plugin_instance.db.get_pool_member,425 ctx, member_id)426 def test_create_health_monitor(self):427 with self.loadbalancer(no_delete=True) as loadbalancer:428 lb_id = loadbalancer['loadbalancer']['id']429 self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)430 with self.listener(loadbalancer_id=lb_id,431 no_delete=True) as listener:432 listener_id = listener['listener']['id']433 self._update_status(models.LoadBalancer, constants.ACTIVE,434 lb_id)435 with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,436 no_delete=True) as pool:437 pool_id = pool['pool']['id']438 self._update_status(models.LoadBalancer, constants.ACTIVE,439 lb_id)440 with self.healthmonitor(pool_id=pool_id,441 no_delete=True) as monitor:442 hm_id = monitor['healthmonitor']['id']443 calls = (444 self.mock_api.create_healthmonitor.call_args_list)445 _, called_hm, called_host = calls[0][0]446 self.assertEqual(hm_id, called_hm.id)447 self.assertEqual('host', called_host)448 self.assertEqual(constants.PENDING_CREATE,449 called_hm.provisioning_status)450 ctx = context.get_admin_context()451 lb = self.plugin_instance.db.get_loadbalancer(452 ctx, lb_id)453 self.assertEqual(constants.PENDING_UPDATE,454 lb.provisioning_status)455 def test_update_health_monitor(self):456 with self.loadbalancer(no_delete=True) as loadbalancer:457 lb_id = loadbalancer['loadbalancer']['id']458 self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)459 with self.listener(loadbalancer_id=lb_id,460 no_delete=True) as listener:461 listener_id = listener['listener']['id']462 self._update_status(models.LoadBalancer, constants.ACTIVE,463 lb_id)464 with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,465 no_delete=True) as pool:466 pool_id = pool['pool']['id']467 self._update_status(models.LoadBalancer, constants.ACTIVE,468 lb_id)469 with self.healthmonitor(pool_id=pool_id,470 no_delete=True) as monitor:471 hm_id = monitor['healthmonitor']['id']472 self._update_status(models.LoadBalancer,473 constants.ACTIVE, lb_id)474 old_to = monitor['healthmonitor']['timeout']475 new_to = 2476 monitor['healthmonitor']['timeout'] = new_to477 ctx = context.get_admin_context()478 self.plugin_instance.update_healthmonitor(ctx, hm_id,479 monitor)480 calls = (481 self.mock_api.update_healthmonitor.call_args_list)482 (_, old_called_hm,483 new_called_hm, called_host) = calls[0][0]484 self.assertEqual(hm_id, new_called_hm.id)485 self.assertEqual(hm_id, old_called_hm.id)486 self.assertEqual(old_to,487 old_called_hm.timeout)488 self.assertEqual(new_to,489 new_called_hm.timeout)490 self.assertEqual(491 constants.PENDING_UPDATE,492 new_called_hm.provisioning_status)493 lb = self.plugin_instance.db.get_loadbalancer(494 ctx, lb_id)495 self.assertEqual(constants.PENDING_UPDATE,496 lb.provisioning_status)497 self.assertEqual('host', called_host)498 def test_delete_health_monitor(self):499 with self.loadbalancer(no_delete=True) as loadbalancer:500 lb_id = loadbalancer['loadbalancer']['id']501 self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)502 with self.listener(loadbalancer_id=lb_id,503 no_delete=True) as listener:504 listener_id = listener['listener']['id']505 self._update_status(models.LoadBalancer, constants.ACTIVE,506 lb_id)507 with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,508 no_delete=True) as pool:509 pool_id = pool['pool']['id']510 self._update_status(models.LoadBalancer, constants.ACTIVE,511 lb_id)512 with self.healthmonitor(pool_id=pool_id,513 no_delete=True) as monitor:514 hm_id = monitor['healthmonitor']['id']515 self._update_status(models.LoadBalancer,516 constants.ACTIVE, lb_id)517 ctx = context.get_admin_context()518 self.plugin_instance.delete_healthmonitor(ctx, hm_id)519 calls = (520 self.mock_api.delete_healthmonitor.call_args_list)521 _, called_hm, called_host = calls[0][0]522 self.assertEqual(hm_id, called_hm.id)523 self.assertEqual('host', called_host)524 self.assertEqual(constants.PENDING_DELETE,525 called_hm.provisioning_status)526 lb = self.plugin_instance.db.get_loadbalancer(527 ctx, lb_id)528 self.assertEqual(constants.ACTIVE,529 lb.provisioning_status)...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

...65 self.file_manager_open()66 def _select_zip(self):67 try:68 if not os.path.isfile(self.zip_path) or not os.path.splitext(self.zip_path)[1] in self.zipsFormats:69 self._update_status( "Wrong File is Selected ")70 return71 Logger.debug(self.zip_path)72 self.zip_name = os.path.basename(self.zip_path)[:-4]73 self._update_status(str(self.zip_name) + " is selected")74 self._update_progress_bar(20)75 except Exception as e:76 Logger.exception(e)77 self._update_status("Something went wrong ... Try again")78 def file_manager_open(self):79 if platform == "android":80 self.file_manager.show("/storage/emulated/0/")81 else:82 self.file_manager.show("/") # output manager to the screen83 self.manager_open = True84 def select_path(self, path):85 '''It will be called when you click on the file name86 or the catalog selection button.87 :type path: str;88 :param path: path to the selected directory or file;89 '''90 self.zip_path = path91 print("printing selected zip path ... "+str(path))92 self.exit_manager()93 def exit_manager(self, *args):94 '''Called when the user reaches the root of the directory tree.'''95 self.manager_open = False96 self.file_manager.close()97 if self.zip_path == None:98 self._update_status("No file is selected")99 return100 select_zip_thread = Thread(target=self._select_zip)101 select_zip_thread.start()102 def events(self, instance, keyboard, keycode, text, modifiers):103 '''Called when buttons are pressed on the mobile device.'''104 if keyboard in (1001, 27):105 if self.manager_open:106 self.file_manager.back()107 return True108 def convert(self,*args):109 if platform == 'android':110 if check_permission(Permission.WRITE_EXTERNAL_STORAGE) == False: # check permission takes str not a list111 request_permission(Permission.WRITE_EXTERNAL_STORAGE)112 request_permission(Permission.READ_EXTERNAL_STORAGE)# request_permisssion takes str and request_permissions takes list113 return114 # Make a Folder for Zip115 # Make ChandanVideo Folder in Internal Storage116 if self.zip_path == None or os.path.splitext(self.zip_path)[1] != '.zip':117 self.root.ids.status_label.text = "Select a zip first "118 return119 self._update_status("Please Wait ... Checking Zip")120 self.clips_path = os.path.join(self.app_path, self.zip_name)121 self.video_folder = os.path.join(self.primary_ext_storage, "Zipvy")122 self.video_location = os.path.join(self.video_folder, self.zip_name+".mp4")123 Logger.debug("Clips Path : "+str(self.clips_path) +124 "\n video_folder : "+str(self.video_folder)+125 "\n video_path:" + str(self.video_location) )126 convert_thread = Thread(target=self._convert)127 convert_thread.start()128 else:129 self.root.ids.prog.value = 75130 self.root.ids.status_label.text = " I am running in "+str(platform)131 print(" I am running in "+str(platform))132 def _convert(self):133 try:134 if not os.path.exists(self.clips_path):135 os.makedirs(self.clips_path)136 if not os.path.exists(self.video_folder):137 os.makedirs(self.video_folder)138 # Extract Zip to the folder139 unzip(self.zip_path, self.clips_path)140 self._update_status("Unzipped")141 Logger.debug("Unzipped clips for file " + str(self.zip_path))142 Logger.debug("clips are available at " + str(self.clips_path))143 # move clips to root144 move_to_root_folder(self.clips_path, self.clips_path)145 Logger.debug("Moving clips to root dir is done")146 # Delete Small clips ...147 deleted_clips = delete_small_clips(self.clips_path)148 Logger.debug("Following Clips are deleted due to low size (less than 100kb)")149 for deleted_clip in deleted_clips:150 Logger.debug(deleted_clip)151 self._update_progress_bar(50)152 self._update_status("Ready to make video")153 # merge clips and make video154 Logger.debug("Entered to merging try block")155 merge(self.clips_path, self.video_location, self._update_progress)156 Logger.debug("End of merging try block")157 # Clearing Temp dir158 delete_all_clips(self.clips_path)159 self._update_status(os.path.basename(self.video_location)[:-4]+" is now ready to watch")160 self._toast("Always use Mx/Vlc Player to watch Videos")161 except Exception:162 Logger.exception('Something happened wrong at merge')163 self._update_status("Something happened wrong at merge")164 def open_video_folder(self,*args):165 if platform == 'android':166 pass167 pass168 def open_video(self,*args):169 if platform == 'android':170 pass171 #self._open(self.video_location)172 def action_button_callback(self,instance):173 if instance.icon == 'language-python':174 self.theme_cls.primary_palette = "Green"175 if instance.icon == 'language-php':176 self.theme_cls.primary_palette = "Orange"177 if instance.icon == 'language-cpp':178 self.theme_cls.primary_palette = "Blue"179 def _update_progress(self,file_no,file_name,files):180 percentage = int(file_no*100/files)181 self._update_progress_bar(50+int(percentage/2))182 progress_text = "Merging Clips " + str(file_no) +"/"+str(files)+" "+ str(percentage)+" %"183 self._update_status(progress_text)184 Logger.debug(progress_text+" of "+ str(file_name))185 @mainthread186 def _update_status(self,text):187 self.root.ids.status_label.text = str(text)188 @mainthread189 def _update_progress_bar(self,value):190 self.root.ids.prog.value = int(value)191 def _open(self, video_location):192 try:193 from kivy.setupconfig import USE_SDL2194 if platform == 'android':195 from jnius import cast196 from jnius import autoclass197 if USE_SDL2:198 PythonActivity = autoclass('org.kivy.android.PythonActivity')199 else:200 PythonActivity = autoclass('org.renpy.android.PythonActivity')...

Full Screen

Full Screen

instrumented_requests.py

Source:instrumented_requests.py Github

copy

Full Screen

...38 fields = {'name': name, 'client': 'requests'}39 http_metrics.request_bytes.add(request_bytes, fields=fields)40 http_metrics.response_bytes.add(response_bytes, fields=fields)41 http_metrics.durations.add(duration_msec, fields=fields)42 _update_status(name, response.status_code)43 return hook44def _update_status(name, status):45 fields = {'status': status, 'name': name, 'client': 'requests'}46 http_metrics.response_status.increment(fields=fields)47def _wrap(method, name, url, *args, **kwargs):48 hooks = {'response': instrumentation_hook(name)}49 if 'hooks' in kwargs:50 hooks.update(kwargs['hooks'])51 kwargs['hooks'] = hooks52 try:53 return getattr(requests, method)(url, *args, **kwargs)54 except requests.exceptions.ReadTimeout:55 _update_status(name, http_metrics.STATUS_TIMEOUT)56 raise57 except requests.exceptions.ConnectionError:58 _update_status(name, http_metrics.STATUS_ERROR)59 raise60 except requests.exceptions.RequestException:61 _update_status(name, http_metrics.STATUS_EXCEPTION)62 raise63request = functools.partial(_wrap, 'request')64get = functools.partial(_wrap, 'get')65head = functools.partial(_wrap, 'head')66post = functools.partial(_wrap, 'post')67patch = functools.partial(_wrap, 'patch')68put = functools.partial(_wrap, 'put')69delete = functools.partial(_wrap, 'delete')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful