Best Python code snippet using lemoncheesecake
cli.py
Source:cli.py
...225 resize_method: str = DEFAULT_RESIZE_METHOD226) -> np.ndarray:227 mask = bodypix_result.get_mask(args.threshold, dtype=np.float32, resize_method=resize_method)228 if args.mask_blur:229 timer.on_step_start('mblur')230 mask = box_blur_image(mask, args.mask_blur)231 if args.mask_mean_count >= 2:232 timer.on_step_start('mmean')233 masks.append(mask)234 if len(masks) > args.mask_mean_count:235 masks.pop(0)236 if len(masks) >= 2:237 mask = np.mean(masks, axis=0)238 LOGGER.debug('mask.shape: %s (%s)', mask.shape, mask.dtype)239 return mask240class ListModelsSubCommand(SubCommand):241 def __init__(self):242 super().__init__("list-models", "Lists available bodypix models (original models)")243 def add_arguments(self, parser: argparse.ArgumentParser):244 add_common_arguments(parser)245 parser.add_argument(246 "--storage-url",247 default="https://storage.googleapis.com/tfjs-models",248 help="The base URL for the storage containing the models"249 )250 def run(self, args: argparse.Namespace): # pylint: disable=unused-argument251 bodypix_model_json_files = [252 file_url253 for file_url in iter_s3_file_urls(args.storage_url)254 if re.match(r'.*/bodypix/.*/model.*\.json', file_url)255 ]256 print('\n'.join(bodypix_model_json_files))257class ConvertToTFLiteSubCommand(SubCommand):258 def __init__(self):259 super().__init__("convert-to-tflite", "Converts the model to a tflite model")260 def add_arguments(self, parser: argparse.ArgumentParser):261 add_common_arguments(parser)262 parser.add_argument(263 "--model-path",264 default=DEFAULT_MODEL_PATH,265 help="The path or URL to the bodypix model."266 )267 parser.add_argument(268 "--output-model-file",269 required=True,270 help="The path to the output file (tflite model)."271 )272 parser.add_argument(273 "--optimize",274 action='store_true',275 help="Enable optimization (quantization)."276 )277 parser.add_argument(278 "--quantization-type",279 choices=['float16', 'float32', 'int8'],280 help="The quantization type to use."281 )282 def run(self, args: argparse.Namespace): # pylint: disable=unused-argument283 LOGGER.info('converting model: %s', args.model_path)284 converter = get_tflite_converter_for_model_path(download_model(285 args.model_path286 ))287 tflite_model = converter.convert()288 if args.optimize:289 LOGGER.info('enabled optimization')290 converter.optimizations = [tf.lite.Optimize.DEFAULT]291 if args.quantization_type:292 LOGGER.info('quanization type: %s', args.quantization_type)293 quantization_type = getattr(tf, args.quantization_type)294 converter.target_spec.supported_types = [quantization_type]295 converter.inference_input_type = quantization_type296 converter.inference_output_type = quantization_type297 LOGGER.info('saving tflite model to: %s', args.output_model_file)298 Path(args.output_model_file).write_bytes(tflite_model)299class AbstractWebcamFilterApp(ABC):300 def __init__(self, args: argparse.Namespace):301 self.args = args302 self.bodypix_model = None303 self.output_sink = None304 self.image_source = None305 self.image_iterator = None306 self.timer = LoggingTimer()307 self.masks: List[np.ndarray] = []308 self.exit_stack = ExitStack()309 self.bodypix_result_cache_time = None310 self.bodypix_result_cache = None311 @abstractmethod312 def get_output_image(self, image_array: np.ndarray) -> np.ndarray:313 pass314 def get_mask(self, *args, **kwargs):315 return get_mask(316 *args, masks=self.masks, timer=self.timer, args=self.args, **kwargs317 )318 def get_bodypix_result(self, image_array: np.ndarray) -> BodyPixResultWrapper:319 assert self.bodypix_model is not None320 current_time = time()321 if (322 self.bodypix_result_cache is not None323 and current_time < self.bodypix_result_cache_time + self.args.mask_cache_time324 ):325 return self.bodypix_result_cache326 self.bodypix_result_cache = self.bodypix_model.predict_single(image_array)327 self.bodypix_result_cache_time = current_time328 return self.bodypix_result_cache329 def __enter__(self):330 self.exit_stack.__enter__()331 self.bodypix_model = load_bodypix_model(self.args)332 self.output_sink = self.exit_stack.enter_context(get_output_sink(self.args))333 self.image_source = self.exit_stack.enter_context(get_image_source_for_args(self.args))334 self.image_iterator = iter(self.image_source)335 return self336 def __exit__(self, *args, **kwargs):337 self.exit_stack.__exit__(*args, **kwargs)338 def next_frame(self):339 self.timer.on_frame_start(initial_step_name='in')340 try:341 image_array = next(self.image_iterator)342 except StopIteration:343 return False, None344 self.timer.on_step_start('model')345 output_image = self.get_output_image(image_array)346 self.timer.on_step_start('out')347 self.output_sink(output_image)348 self.timer.on_frame_end()349 return True, output_image350 def run(self):351 out_path = Path(self.args.output)352 out_parent= out_path.parent353 output_file_name = Path(self.args.source).name354 name_without_ext = Path(self.args.source).stem355 if not os.path.exists(out_parent):356 os.makedirs(out_parent)357 out_dir = os.path.join(out_parent,name_without_ext)358 if not os.path.exists(out_dir):359 os.makedirs(out_dir)360 output_video_path = os.path.join(out_parent,output_file_name)361 #print("Checking parent of out:",out_path,out_parent)362 363 try:364 self.timer.start()365 counter = 0366 while True:367 status, output_image = self.next_frame()368 if status==False:369 break370 im = Image.fromarray(output_image.astype('uint8'), 'RGB')371 im.save(os.path.join(out_dir, str(counter)+".jpg"))372 #im.save("C:/Wasif/PD Motor Feature Extraction/Facebook Body Segmentation/dino-main/output/temp%d.jpg"%(counter))373 counter +=1374 #Save video and delete image folder375 img_array = []376 for filename in natsort.natsorted(glob.glob(out_dir+"/*.jpg")):377 #print(filename)378 img = cv2.imread(filename)379 height, width, layers = img.shape380 size = (width,height)381 img_array.append(img)382 out = cv2.VideoWriter(output_video_path,cv2.VideoWriter_fourcc(*'mp4v'), 15, size)383 384 for i in range(len(img_array)):385 out.write(img_array[i])386 out.release()387 shutil.rmtree(out_dir)388 os.remove(self.args.output)389 if self.args.show_output:390 LOGGER.info('waiting for window to be closed')391 while not self.output_sink.is_closed:392 sleep(0.5)393 except KeyboardInterrupt:394 LOGGER.info('exiting')395class AbstractWebcamFilterSubCommand(SubCommand):396 def add_arguments(self, parser: argparse.ArgumentParser):397 add_common_arguments(parser)398 add_model_arguments(parser)399 add_source_arguments(parser)400 add_output_arguments(parser)401 @abstractmethod402 def get_app(self, args: argparse.Namespace) -> AbstractWebcamFilterApp:403 pass404 def run(self, args: argparse.Namespace):405 with self.get_app(args) as app:406 app.run()407class DrawMaskApp(AbstractWebcamFilterApp):408 def get_output_image(self, image_array: np.ndarray) -> np.ndarray:409 resize_method = DEFAULT_RESIZE_METHOD410 result = self.get_bodypix_result(image_array)411 self.timer.on_step_start('get_mask')412 mask = self.get_mask(result, resize_method=resize_method)413 if self.args.colored:414 self.timer.on_step_start('get_cpart_mask')415 mask_image = result.get_colored_part_mask(416 mask, part_names=self.args.parts, resize_method=resize_method417 )418 elif self.args.parts:419 self.timer.on_step_start('get_part_mask')420 mask_image = result.get_part_mask(421 mask, part_names=self.args.parts, resize_method=resize_method422 ) * 255423 else:424 mask_image = mask * 255425 if self.args.mask_alpha is not None:426 self.timer.on_step_start('overlay')427 LOGGER.debug('mask.shape: %s (%s)', mask.shape, mask.dtype)428 alpha = self.args.mask_alpha429 try:430 if mask_image.dtype == tf.int32:431 mask_image = tf.cast(mask, tf.float32)432 except TypeError:433 pass434 output = np.clip(435 image_array * (1 - alpha) + mask_image * alpha,436 0.0, 255.0437 )438 return output439 return mask_image440class DrawMaskSubCommand(AbstractWebcamFilterSubCommand):441 def __init__(self):442 super().__init__("draw-mask", "Draws the mask for the input")443 def add_arguments(self, parser: argparse.ArgumentParser):444 super().add_arguments(parser)445 parser.add_argument(446 "--mask-alpha",447 type=float,448 help="The opacity of mask overlay to add."449 )450 parser.add_argument(451 "--add-overlay-alpha",452 dest='mask_alpha',453 type=float,454 help="Deprecated, please use --mask-alpha instead."455 )456 parser.add_argument(457 "--colored",458 action="store_true",459 help="Enable generating the colored part mask"460 )461 parser.add_argument(462 "--parts",463 nargs="*",464 choices=PART_CHANNELS,465 help="Select the parts to output"466 )467 def get_app(self, args: argparse.Namespace) -> AbstractWebcamFilterApp:468 return DrawMaskApp(args)469class BlurBackgroundApp(AbstractWebcamFilterApp):470 def get_output_image(self, image_array: np.ndarray) -> np.ndarray:471 result = self.get_bodypix_result(image_array)472 self.timer.on_step_start('get_mask')473 mask = self.get_mask(result)474 self.timer.on_step_start('bblur')475 background_image_array = box_blur_image(image_array, self.args.background_blur)476 self.timer.on_step_start('compose')477 output = np.clip(478 background_image_array * (1 - mask)479 + image_array * mask,480 0.0, 255.0481 )482 return output483class BlurBackgroundSubCommand(AbstractWebcamFilterSubCommand):484 def __init__(self):485 super().__init__("blur-background", "Blurs the background of the webcam image")486 def add_arguments(self, parser: argparse.ArgumentParser):487 super().add_arguments(parser)488 parser.add_argument(489 "--background-blur",490 type=int,491 default=15,492 help="The blur radius for the background."493 )494 def get_app(self, args: argparse.Namespace) -> AbstractWebcamFilterApp:495 return BlurBackgroundApp(args)496class ReplaceBackgroundApp(AbstractWebcamFilterApp):497 def __init__(self, *args, **kwargs):498 self.background_image_iterator = None499 super().__init__(*args, **kwargs)500 def get_next_background_image(self, image_array: np.ndarray) -> np.ndarray:501 if self.background_image_iterator is None:502 background_image_source = self.exit_stack.enter_context(get_image_source(503 self.args.background,504 image_size=get_image_size(image_array)505 ))506 self.background_image_iterator = iter(cycle(background_image_source))507 return next(self.background_image_iterator)508 def get_output_image(self, image_array: np.ndarray) -> np.ndarray:509 background_image_array = self.get_next_background_image(image_array)510 result = self.get_bodypix_result(image_array)511 self.timer.on_step_start('get_mask')512 mask = self.get_mask(result)513 self.timer.on_step_start('compose')514 background_image_array = resize_image_to(515 background_image_array, get_image_size(image_array)516 )517 output = np.clip(518 background_image_array * (1 - mask)519 + image_array * mask,520 0.0, 255.0521 )522 return output523class ReplaceBackgroundSubCommand(AbstractWebcamFilterSubCommand):524 def __init__(self):525 super().__init__("replace-background", "Replaces the background of a person")526 def add_arguments(self, parser: argparse.ArgumentParser):527 add_common_arguments(parser)...
hooks.py
Source:hooks.py
1"""2Hooks provide a way to do things when events happen during the workflow lifecycle.3"""4from virtool_workflow.execution.hooks import Hook5on_result = Hook("on_result")6"""7Triggered when a workflow has completed and a result is available.8.. code-block:: python9 @on_result10 async def use_result(workflow: Workflow, results: Dict[str, Any]):11 ...12"""13on_step_start = Hook("on_step_start")14"""15Triggered before each workflow step is executed.16The :class:`WorkflowStep` object is available via the `current_step` fixture.17.. code_block:: python18 @on_step_start19 async def use_step(current_step):20 ...21"""22on_step_finish = Hook("on_step_end")23"""24Triggered after each workflow step is executed.25The :class:`WorkflowStep` object is available via the `current_step` fixture.26@on_step_finish27async def handle_step_finish(current_step):28 ...29"""30on_workflow_start = Hook("on_workflow_start")31"""32Triggered at the start of the workflow, before any steps are executed.33"""34on_success = Hook("on_success")35"""36Triggered when a job completes successfully.37Parameters supplied are the `Workflow` instance and the results dict.38.. code-block:: python39 @on_success40 async def perform_on_success(workflow: Workflow, results: Dict[str, Any]):41 ...42"""43on_cancelled = Hook("on_cancelled")44"""45Triggered when a job is cancelled.46.. code-block:: python47 @on_cancelled48 async def handle_cancellation(error: asyncio.CancelledError):49 ...50"""51on_error = Hook("on_error")52"""53Triggered when a job encounters an exception while running.54The exception can be found in the ``error`` fixture.55.. code-block:: python56 @on_error57 async def handle_error(error: Exception):58 ...59"""60on_terminated = Hook("on_terminated")61"""62Triggered when the workflow process receives a SIGTERM.63.. code-block:: python64 @on_terminated65 def handle_termination():66 ...67"""68on_failure = Hook("on_failure")69"""70Triggered when a job fails to complete.71Failure to complete can be caused by: user cancellation, termination by the host, or72an error during workflow execution.73.. code-block:: python74 @on_failure75 async def handle_failure(error: Exception):76 ...77"""78on_finish = Hook("on_finish")79"""80Triggered when a job completes, success or failure.81.. code-block:: python82 @on_finish83 async def do_something_on_finish(workflow: Workflow):84 ...85"""86__all__ = [87 "on_cancelled",88 "on_error",89 "on_failure",90 "on_finish",91 "on_result",92 "on_step_finish",93 "on_step_start",94 "on_success",95 "on_terminated",96 "on_workflow_start",...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!