How to use _get_image_from_cache method in avocado

Best Python code snippet using avocado_python

podman.py

Source:podman.py Github

copy

Full Screen

...188 output_opts = (189 "-v",190 (f"{test_output}:" f"{os.path.expanduser(podman_output)}"),191 )192 image, _ = self._get_image_from_cache(runtime_task)193 if not image:194 image = self.config.get("spawner.podman.image")195 envs = [f"-e={k}={v}" for k, v in env_args.items()]196 try:197 # pylint: disable=W0201198 _, stdout, _ = await self.podman.execute(199 "create",200 *status_server_opts,201 *output_opts,202 *test_opts,203 entry_point_arg,204 *envs,205 image,206 )207 except PodmanException as ex:208 msg = f"Could not create podman container: {ex}"209 runtime_task.status = msg210 return False211 return stdout.decode().strip()212 async def spawn_task(self, runtime_task):213 self.create_task_output_dir(runtime_task)214 podman_bin = self.config.get("spawner.podman.bin")215 try:216 # pylint: disable=W0201217 self.podman = Podman(podman_bin)218 except PodmanException as ex:219 runtime_task.status = str(ex)220 return False221 major, minor, _ = await self.python_version222 # Return only the "to" location223 eggs = self.get_eggs_paths(major, minor)224 destination_eggs = ":".join(map(lambda egg: str(egg[1]), eggs))225 env_args = {"PYTHONPATH": destination_eggs}226 output_dir_path = self.task_output_dir(runtime_task)227 container_id = await self._create_container_for_task(228 runtime_task, env_args, output_dir_path229 )230 runtime_task.spawner_handle = container_id231 await self.deploy_avocado(container_id)232 try:233 # pylint: disable=W0201234 returncode, _, _ = await self.podman.start(container_id)235 except PodmanException as ex:236 msg = f"Could not start container: {ex}"237 runtime_task.status = msg238 LOG.error(msg)239 return False240 return returncode == 0241 def create_task_output_dir(self, runtime_task):242 output_dir_path = self.task_output_dir(runtime_task)243 output_podman_path = "~/avocado/job-results/spawner/task"244 os.makedirs(output_dir_path, exist_ok=True)245 runtime_task.task.setup_output_dir(output_podman_path)246 async def wait_task(self, runtime_task):247 while True:248 if not self.is_task_alive(runtime_task):249 return250 await asyncio.sleep(0.1)251 async def terminate_task(self, runtime_task):252 try:253 await self.podman.stop(runtime_task.spawner_handle)254 except PodmanException as ex:255 msg = f"Could not stop container: {ex}"256 runtime_task.status = msg257 LOG.error(msg)258 return False259 @staticmethod260 async def check_task_requirements(runtime_task):261 """Check the runtime task requirements needed to be able to run"""262 # right now, limit the check to the runner availability.263 if runtime_task.task.runnable.runner_command() is None:264 return False265 return True266 async def update_requirement_cache(267 self, runtime_task, result268 ): # pylint: disable=W0221269 environment_id, _ = self._get_image_from_cache(runtime_task, True)270 if result in STATUSES_NOT_OK:271 cache.delete_environment(self.environment, environment_id)272 return273 _, stdout, _ = await self.podman.execute(274 "commit", "-q", runtime_task.spawner_handle275 )276 container_id = stdout.decode().strip()277 cache.update_environment(self.environment, environment_id, container_id)278 cache.update_requirement_status(279 self.environment,280 container_id,281 runtime_task.task.runnable.kind,282 runtime_task.task.runnable.kwargs.get("name"),283 True,284 )285 async def save_requirement_in_cache(self, runtime_task): # pylint: disable=W0221286 container_id = str(uuid.uuid4())287 _, requirements = self._get_image_from_cache(runtime_task)288 if requirements:289 for requirement_type, requirement in requirements:290 cache.set_requirement(291 self.environment, container_id, requirement_type, requirement292 )293 cache.set_requirement(294 self.environment,295 container_id,296 runtime_task.task.runnable.kind,297 runtime_task.task.runnable.kwargs.get("name"),298 False,299 )300 async def is_requirement_in_cache(self, runtime_task): # pylint: disable=W0221301 environment, _ = self._get_image_from_cache(runtime_task, use_task=True)302 if not environment:303 return False304 if cache.is_environment_prepared(environment):305 return True306 return None307 def _get_image_from_cache(self, runtime_task, use_task=False):308 def _get_all_finished_requirements(requirement_tasks):309 all_finished_requirements = []310 for requirement in requirement_tasks:311 all_finished_requirements.extend(312 _get_all_finished_requirements(requirement.dependencies)313 )314 runnable = requirement.task.runnable315 all_finished_requirements.append(316 (runnable.kind, runnable.kwargs.get("name"))317 )318 return all_finished_requirements319 finished_requirements = []320 if use_task:321 finished_requirements.append(...

Full Screen

Full Screen

icon.py

Source:icon.py Github

copy

Full Screen

...61 (size[0] * factor, size[1] * factor), resample=Image.LANCZOS62 )63 return out_img.filter(ImageFilter.SMOOTH_MORE)64_cache = {}65def _get_image_from_cache(name):66 if name not in _cache:67 _cache[name] = get_image(name)68 return _cache[name]69def _get_sprite_data(name):70 for sprite_data in atlas_data["sprites"]:71 if sprite_data["name"].lower() == name.lower():72 return sprite_data73 return None74def get_image(name, sprite_data=None):75 if sprite_data is None:76 sprite_data = _get_sprite_data(name)77 if sprite_data is None:78 return None79 return _process_file(sprite_data).convert("1", dither=0)80def _get_layers(emoji_name):81 for index in range(len(emoji_layer_data) // 2):82 name, layers = (83 emoji_layer_data[2 * index],84 emoji_layer_data[(2 * index) + 1],85 )86 if emoji_name == name:87 return layers88 return None89def get_emoji(name, layers=None):90 if layers is None:91 layers = _get_layers(name)92 if layers is None:93 return None94 # Created later on the first layer95 out_image = None96 for i in range(1, len(layers) // 2 + 1):97 colour, layer = layers[len(layers) - 2 * i], layers[len(layers) - (2 * i) + 1]98 # Convert decimal colour to hex99 hex_colour = hex(int(colour)).split("x")[-1]100 # Convert hex colour to RGB values101 r = (int(hex_colour, 16) & 0xFF0000) >> 16102 g = (int(hex_colour, 16) & 0x00FF00) >> 8103 b = int(hex_colour, 16) & 0x0000FF104 img = _get_image_from_cache(f"emojis/{layer}")105 # If we are the first layer, create the blank canvas106 if not out_image:107 out_image = Image.new(108 "RGBA", (img.size[0], img.size[1]), color=(0, 0, 0, 0)109 )110 # Create a solid colour image from our RGB colour111 # Will be masked later by the actual emoji layer112 color_img = Image.new(113 "RGBA", (out_image.size[0], out_image.size[1]), color=(r, g, b, 255)114 )115 # Here we mask the solid colour with the parts we want to116 # appear in the final image117 out_image = Image.composite(118 color_img, out_image, img.getchannel(0).convert("1", dither=0)...

Full Screen

Full Screen

lastcol.py

Source:lastcol.py Github

copy

Full Screen

...44 except:45 image = Image.new('RGB', (500, 500))46 image.save(path)47 return path48 def _get_image_from_cache(self, url):49 url_parts = urlparse(url)50 cache_name = str(url_parts.path).replace('/', '')51 cache_name = cache_name or 'empty.png'52 if os.path.isfile(self.cache_path+'/'+cache_name):53 return self.cache_path+'/'+cache_name54 path = self._download_file(url, self.cache_path+'/'+cache_name)55 return path56 def _get_images(self, artists):57 image_info = []58 for artist in artists:59 url = artist['image'][3]['#text']60 path = self._get_image_from_cache(url)61 spot_info = {62 'name': artist['name'],63 'path': path,64 }65 image_info.append(spot_info)66 return image_info67 def _insert_name(self, image, name, cursor):68 draw = ImageDraw.Draw(image, 'RGBA')69 font = ImageFont.truetype(self.cache_path+'/myfont.ttf', size=17)70 x = cursor[0]71 y = cursor[1]72 draw.rectangle([(x, y+200), (x+300, y+240)], (0, 0, 0, 123))73 draw.text((x+8, y+210), name, (255, 255, 255), font=font)74 def _create_collage(self, cols=3, rows=3):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful