Best Python code snippet using avocado_python
asset.py
Source:asset.py
...115 # the write after download, let's get the lock before start the116 # download.117 with FileLock(asset_path, 120):118 try:119 self.find_asset_file(create_metadata=True)120 return True121 except OSError:122 LOG.debug("Asset not in cache after lock, fetching it.")123 url_download(url_obj.geturl(), temp, timeout=timeout)124 shutil.copy(temp, asset_path)125 self._create_hash_file(asset_path)126 if not self._verify_hash(asset_path):127 msg = "Hash mismatch. Ignoring asset from the cache"128 raise OSError(msg)129 return True130 finally:131 try:132 os.remove(temp)133 except FileNotFoundError:134 LOG.info("Temporary asset file unavailable due to failed"135 " download attempt.")136 @staticmethod137 def _get_hash_file(asset_path):138 """139 Returns the file name that contains the hash for a given asset file140 :param asset_path: full path of the asset file.141 :returns: the CHECKSUM path142 :rtype: str143 """144 return '%s-CHECKSUM' % asset_path145 def _get_hash_from_file(self, asset_path):146 """147 Read the CHECKSUM file from the asset and return the hash.148 :param asset_path: full path of the asset file.149 :returns: the hash, if it exists.150 :rtype: str151 """152 hash_file = self._get_hash_file(asset_path)153 if not os.path.isfile(hash_file):154 self._create_hash_file(asset_path)155 return Asset.read_hash_from_file(hash_file)[1]156 @classmethod157 def read_hash_from_file(cls, filename):158 """Read the CHECKSUM file and return the hash.159 This method raises a FileNotFoundError if file is missing and assumes160 that filename is the CHECKSUM filename.161 :rtype: list with algorithm and hash162 """163 try:164 with FileLock(filename, 30):165 with open(filename, 'r', encoding='utf-8') as hash_file:166 for line in hash_file:167 # md5 is 32 chars big and sha512 is 128 chars big.168 # others supported algorithms are between those.169 if re.match('^.* [a-f0-9]{32,128}', line):170 return line.split()171 except Exception: # pylint: disable=W0703172 exc_type, exc_value = sys.exc_info()[:2]173 LOG.error('%s: %s', exc_type.__name__, exc_value)174 return [None, None]175 def _get_local_file(self, url_obj, asset_path, _):176 """177 Create a symlink for a local file into the cache.178 :param url_obj: object from urlparse.179 :param asset_path: full path of the asset file.180 :returns: if the local file matches the hash.181 :rtype: bool182 """183 if os.path.isdir(url_obj.path):184 path = os.path.join(url_obj.path, self.name)185 else:186 path = url_obj.path187 with FileLock(asset_path, 1):188 try:189 os.symlink(path, asset_path)190 self._create_hash_file(asset_path)191 return self._verify_hash(asset_path)192 except OSError as detail:193 if detail.errno == errno.EEXIST:194 os.remove(asset_path)195 os.symlink(path, asset_path)196 self._create_hash_file(asset_path)197 return self._verify_hash(asset_path)198 def _get_relative_dir(self):199 """200 When an asset name is not an URI, and:201 1. it also has a hash;202 2. or it has multiple locations;203 there's a clear intention for it to be unique *by name*, overwriting204 it if the file is corrupted or expired. These will be stored in the205 cache directory indexed by name.206 When an asset name is an URI, whether it has a hash or not, it will be207 saved according to their locations, so that multiple assets with the208 same file name, but completely unrelated to each other, will still209 coexist.210 :returns: target location of asset the file.211 :rtype: str212 """213 if (not self.name_scheme and214 (self.asset_hash or len(self.locations) > 1)):215 return 'by_name'216 # check if the URI is located on self.locations or self.parsed_name217 if self.locations:218 # if it is on self.locations, we need to check if it has the219 # asset name on it or a trailing '/'220 if ((self.asset_name in self.locations[0]) or221 (self.locations[0][-1] == '/')):222 base_url = os.path.dirname(self.locations[0])223 else:224 # here, self.locations is a pure conformant URI225 base_url = self.locations[0]226 else:227 # the URI is on self.parsed_name228 if self.parsed_name.query:229 base_url = "%s://%s%s" % (self.parsed_name.scheme,230 self.parsed_name.netloc,231 self.parsed_name.path)232 else:233 base_url = os.path.dirname(self.parsed_name.geturl())234 base_url_hash = hashlib.new(DEFAULT_HASH_ALGORITHM,235 base_url.encode(astring.ENCODING))236 return os.path.join('by_location', base_url_hash.hexdigest())237 def _get_writable_cache_dir(self):238 """239 Returns the first available writable cache directory240 When a asset has to be downloaded, a writable cache directory241 is then needed. The first available writable cache directory242 will be used.243 :returns: the first writable cache dir244 :rtype: str245 :raises: OSError246 """247 for cache_dir in self.cache_dirs:248 cache_dir = os.path.expanduser(cache_dir)249 if utils_path.usable_rw_dir(cache_dir):250 return cache_dir251 raise OSError("Can't find a writable cache directory.")252 @staticmethod253 def _is_expired(path, expire):254 """255 Checks if a file is expired according to expired parameter.256 :param path: full path of the asset file.257 :returns: the expired status of an asset.258 :rtype: bool259 """260 if expire is None:261 return False262 creation_time = os.lstat(path)[stat.ST_CTIME]263 expire_time = creation_time + expire264 if time.monotonic() > expire_time:265 return True266 return False267 @classmethod268 def _has_valid_hash(cls, asset_path, asset_hash=None):269 """Checks if a file has a valid hash based on the hash parameter.270 If asset_hash is None then will consider a valid asset.271 """272 if asset_hash is None:273 LOG.debug("No hash provided. Cannot check the asset file"274 " integrity.")275 return True276 hash_path = cls._get_hash_file(asset_path)277 _, hash_from_file = cls.read_hash_from_file(hash_path)278 if hash_from_file == asset_hash:279 return True280 return False281 def _verify_hash(self, asset_path):282 """283 Verify if the `asset_path` hash matches the hash in the hash file.284 :param asset_path: full path of the asset file.285 :returns: True when self.asset_hash is None or when it has the same286 value as the hash of the asset_file, otherwise return False.287 :rtype: bool288 """289 return self._has_valid_hash(asset_path, self.asset_hash)290 def fetch(self, timeout=None):291 """Try to fetch the current asset.292 First tries to find the asset on the provided cache_dirs list.293 Then tries to download the asset from the locations list294 provided.295 :param timeout: timeout in seconds. Default is296 :data:`afutils.asset.DOWNLOAD_TIMEOUT`.297 :raise OSError: When it fails to fetch the asset298 :returns: The path for the file on the cache directory.299 :rtype: str300 """301 # First let's search for the file in each one of the cache locations302 asset_file = None303 error = "Can't fetch: 'urls' is not defined."304 timeout = timeout or DOWNLOAD_TIMEOUT305 LOG.info("Fetching asset %s", self.name)306 try:307 return self.find_asset_file(create_metadata=True)308 except OSError:309 LOG.info("Asset not in cache, fetching it.")310 # If we get to this point, we have to download it from a location.311 # A writable cache directory is then needed. The first available312 # writable cache directory will be used.313 cache_dir = self._get_writable_cache_dir()314 # Now we have a writable cache_dir. Let's get the asset.315 for url in self.urls:316 if url is None:317 continue318 urlobj = urlparse(url)319 if urlobj.scheme in ['http', 'https', 'ftp']:320 fetch = self._download321 elif urlobj.scheme == 'file':322 fetch = self._get_local_file323 # We are assuming that everything starting with './' or '/' are a324 # file too.325 elif url.startswith(('/', './')):326 fetch = self._get_local_file327 else:328 raise UnsupportedProtocolError("Unsupported protocol"329 ": %s" % urlobj.scheme)330 asset_file = os.path.join(cache_dir,331 self.relative_dir)332 dirname = os.path.dirname(asset_file)333 if not os.path.isdir(dirname):334 os.makedirs(dirname, exist_ok=True)335 try:336 if fetch(urlobj, asset_file, timeout):337 LOG.info("Asset downloaded.")338 if self.metadata is not None:339 self._create_metadata_file(asset_file)340 return asset_file341 except Exception: # pylint: disable=W0703342 exc_type, exc_value = sys.exc_info()[:2]343 LOG.error('%s: %s', exc_type.__name__, exc_value)344 error = exc_value345 raise OSError("Failed to fetch %s (%s)." % (self.asset_name, error))346 def find_asset_file(self, create_metadata=False):347 """348 Search for the asset file in each one of the cache locations349 :param bool create_metadata: Should this method create the350 metadata in case asset file found351 and metadata is not found? Default352 is False.353 :return: asset path, if it exists in the cache354 :rtype: str355 :raises: OSError356 """357 for cache_dir in self.cache_dirs:358 cache_dir = os.path.expanduser(cache_dir)359 asset_file = os.path.join(cache_dir, self.relative_dir)360 # Ignore non-files361 if not os.path.isfile(asset_file):362 continue363 # Ignore expired asset files364 if self._is_expired(asset_file, self.expire):365 continue366 # Ignore mismatch hash367 if not self._has_valid_hash(asset_file, self.asset_hash):368 continue369 if create_metadata:370 self._create_metadata_file(asset_file)371 LOG.info("Asset already exists in cache.")372 return asset_file373 raise OSError("File %s not found in the cache." % self.asset_name)374 def get_metadata(self):375 """376 Returns metadata of the asset if it exists or None.377 :return: metadata378 :rtype: dict or None379 """380 try:381 asset_file = self.find_asset_file()382 except OSError:383 raise OSError("Metadata not available.")384 basename = os.path.splitext(asset_file)[0]385 metadata_file = "%s_metadata.json" % basename386 if os.path.isfile(metadata_file):387 with open(metadata_file, "r", encoding='utf-8') as f:388 metadata = json.load(f)389 return metadata390 @property391 def asset_name(self):392 if self.parsed_name.query:393 return self.parsed_name.query394 return os.path.basename(self.parsed_name.path)395 @classmethod...
common.py
Source:common.py
...169 for filename in fnmatch.filter(filenames, filetype):170 try: os.remove(os.path.join(rootDir, filename))171 except OSError: pass # TODO Better printout172 173 def find_asset_file(filename_find): # Finds exact and only one file or panics174 global BUILD_ERROR175 namelist = []176 for rootDir, subdirs, filenames in os.walk('./_editable_assets'):177 if filename_find in filenames:178 namelist.append(os.path.join(rootDir, filename_find))179 if len(namelist) == 1: 180 return namelist[0]181 elif len(namelist) == 0:182 print(col.ERROR, "ERROR: NO ASSET FILE FOUND:", filename_find, col.ENDC)183 BUILD_ERROR = True184 else:185 print(col.ERROR, "ERROR: FOUND MORE THAN A SINGLE FILE WITH GIVEN NAME:", filename_find, col.ENDC)186 print(namelist)187 BUILD_ERROR = True188 189 def scan_assets(filetype): # Returns a list of files, may be empty190 namelist = []191 for rootDir, subdirs, filenames in os.walk('./_editable_assets'):192 for filename in fnmatch.filter(filenames, filetype):193 namelist.append(os.path.join(rootDir, filename))194 if len(namelist) == 0:195 print(col.WARNING, "Warning: no assets of type:", filetype, col.ENDC)196 # TODO collect all warning messages and print them separately197 return namelist198 199 def link_overwrite(source_name, target_dir):200 global BUILD_ERROR201 source = find_asset_file(source_name)202 target = os.path.join(directories[target_dir], source_name)203 try: os.remove(target)204 except OSError: pass #TODO better printout205 try: os.link(source, target)206 except TypeError: 207 print(col.ERROR, "ERROR: UNABLE TO HARDLINK A TARGET FILE:", target, col.ENDC)208 BUILD_ERROR = True209 210 def remove_target_directory(path): ...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!