Best Python code snippet using playwright-python
sources.py
Source:sources.py
1import os2import pickle3import re4import zipfile5from abc import ABC6from typing import List, Tuple7from Tools import ImageLoader8from container import Container9class Source(ABC):10 _chunk = -111 _chunks = []12 _dumping = True13 _chunk_size = 100014 def __init__(self, app: Container, chunk_size: int = 1000):15 self._chunk_size = chunk_size16 self.data = []17 self.length = 018 self.app = app19 self.loader = app.resolve(ImageLoader)20 def __iter__(self):21 self._current = 022 return self23 def __next__(self):24 index = self._current25 self._current += 126 if index >= self.length:27 raise StopIteration28 return self[index]29 def _slice_impl(self, item):30 full_list = []31 for chunk in self._chunks:32 full_list.extend(chunk)33 new_list = full_list[item.start:item.stop]34 source = Source(self.app)35 source._chunks = list(self.app.helper.list_chunk(new_list, self._chunk_size))36 source.length = len(new_list)37 return source38 def _item_impl(self, item):39 if self._chunk == -1 or self._chunk != int(item / self._chunk_size):40 self._chunk = int(item / self._chunk_size)41 self.data = self.loader(42 self._chunks[self._chunk]43 )44 image = self.data[item % self._chunk_size]45 return image.grayscale, image.thermal46 def __getitem__(self, item):47 if isinstance(item, slice):48 return self._slice_impl(item)49 return self._item_impl(item)50 def __len__(self):51 return self.length52 def dump(self, path: str):53 with open(path, 'wb') as bin_file:54 pickle.dump(self.data, bin_file)55 def load(self, path: str):56 with open(path, 'rb') as bin_file:57 self.data = pickle.load(bin_file)58 def chunks_dump(self):59 name = self.app.helper.time_name()60 os.mkdir(self.app.config('data.cache') + name)61 meta = []62 for i, datum in enumerate(self._chunks):63 self.data = self.loader(datum)64 self.dump(self.app.config('data.cache') + name + '/' + str(i) + '.pkl')65 meta.append((str(i) + '.pkl', len(self.data)))66 self.app.helper.dump_json(self.app.config('data.cache') + name + '/meta.json', meta)67class FileSource(Source):68 def __init__(self, app: Container, dir_name: str, dump: bool = True):69 super(FileSource, self).__init__(app)70 self.dir_name = dir_name71 self._dumping = dump72 self.fetch()73 def fetch(self):74 # Detect the zip files presented in given dir75 zip_files = []76 for file in os.listdir(self.dir_name):77 if re.match(r'(.*)\.zip', file):78 zip_files.append(79 zipfile.ZipFile(self.dir_name + '/' + file)80 )81 manifest = []82 # Load data from each zip file83 for zip_file in zip_files:84 manifest += self.load_list(zip_file)85 self.length = len(manifest)86 # å è½½å表åå87 self._chunks = list(self.app.helper.list_chunk(manifest, self._chunk_size))88 if self._dumping:89 self.chunks_dump()90 @staticmethod91 def load_list(file: zipfile.ZipFile) -> list:92 nums = []93 # Determine the first file in zip file94 for name in file.namelist():95 # Extract every file from zip file96 file.extract(name, path='./storage/images')97 num = re.search(r'GS(.*)\.png', name)98 if num:99 nums.append(100 int(num.group(1))101 )102 start = min(nums)103 # Determine the name of extract dir104 dir_name = file.namelist()[0].split('/')[0]105 # Transform the result file into a load list106 with open('./storage/images/' + dir_name + '/result.txt', 'r') as results_file:107 results = results_file.readlines()108 manifest = []109 for i, result in enumerate(results, start):110 result = result.split()111 manifest.append((112 file.filename + ':{GS|RGB}' + str(i) + '.png',113 './storage/images/' + dir_name + '/GS' + str(i) + '.png',114 './storage/images/' + dir_name + '/RGB' + str(i) + '.png',115 float(result[0]),116 float(result[1]),117 ))118 return manifest119class DBSource(Source):120 _chunk = -1121 _chunks = []122 _dumping = True123 def __init__(124 self,125 app: Container,126 db_hostname: str,127 db_username: str,128 db_password: str,129 db_name: str,130 base_url: str,131 dump: bool = True132 ):133 """134 Database Source Implementation135 Args:136 db_hostname: The hostname of database server137 db_username: The username of database server138 db_password: The password of database user139 db_name: The name of database to use140 base_url: The base url prefix append before images path141 """142 super(DBSource, self).__init__(app)143 import pymysql144 import threading145 self._dumping = dump146 self.base_url = base_url147 self.download_tasks = []148 self.manifest = []149 self.downloading = False150 # Init the connection to database151 self.connection = pymysql.Connection(152 user=db_username,153 password=db_password,154 host=db_hostname,155 database=db_name156 )157 self.base_db_url = 'mysql://' + self.connection.get_host_info().split()[-1] + '/' + db_name + '/'158 # Init two task threads159 self.fetcher = threading.Thread(target=self.fetch)160 self.downloader = threading.Thread(target=self.download)161 self.fetcher.start()162 self.downloader.start()163 # Wait both threads exit164 while True:165 if self.fetcher.is_alive():166 continue167 elif self.download_tasks:168 continue169 else:170 self.downloading = False171 break172 self._chunks = list(self.app.helper.list_chunk(self.manifest, self._chunk_size))173 self.length = len(self.manifest)174 def fetch(self) -> None:175 # Two cursors to perform different queries176 cursor_1 = self.connection.cursor()177 cursor_2 = self.connection.cursor()178 course = 'SELECT * from `mtrl_info`' # Query all materials179 cursor_1.execute(course)180 materials = []181 while True:182 result = cursor_1.fetchone()183 if not result:184 break185 material_id = result[0]186 # Query all params belongs to current material187 course = 'SELECT * from `mtrl_para` where `mtrl_id` = %d' % material_id188 cursor_2.execute(course)189 params = cursor_2.fetchall()190 # Init a material param list and fill it191 material = [None, None, None, None, None]192 for param in params:193 material[194 param[2]195 ] = (param[3] if param[2] in [1, 2] else (self.base_url + param[7]))196 urls = material[3:]197 material[0] = self.base_db_url + 'mtrl_info/' + str(material_id)198 material[3] = 'images/' + material[3].split('/')[-1]199 material[4] = 'images/' + material[4].split('/')[-1]200 # Push download task into the download tasks list201 self.download_tasks.append({'url': urls[0], 'path': 'images/'})202 self.download_tasks.append({'url': urls[1], 'path': 'images/'})203 # Push the material params list into the materials list204 materials.append(material)205 self._chunks = list(self.app.helper.list_chunk(materials, self._chunk_size))206 self.length = len(materials)207 if self._dumping:208 self.chunks_dump()209 def download(self):210 import requests211 # Downloading signal212 self.downloading = True213 while self.downloading:214 if self.download_tasks:215 task = self.download_tasks.pop()216 url = task['url']217 path = task['path']218 response = requests.get(url)219 filename = url.split('/')[-1]220 with open(path + filename, 'wb') as file:221 file.write(response.content)222 else:223 continue224class SavedSource(Source):225 def __init__(self, app: Container, dir_name: str):226 """227 Load data from a previously bin source.228 Args:229 dir_name: The path to saves file230 """231 super(SavedSource, self).__init__(app)232 self.path = dir_name233 self.fetch()234 self.loader = self.load_chunk235 def refresh(self):236 length = 0237 for chunk in self._chunks:238 length += chunk[1] - chunk[2]239 self.length = length240 self._chunk = -1241 self.data = []242 return self243 def fetch(self):244 meta = self.app.helper.load_json(self.path + '/meta.json')245 self._chunks = []246 for chunk_desc in meta:247 self._chunks.append(tuple(chunk_desc) + (0,))248 self.length += chunk_desc[1]249 def load_chunk(self, chunk_desc):250 return self.load(self.path + '/' + chunk_desc[0])251 def locate_chunk(self, item):252 for _i, _chunk in enumerate(self._chunks):253 item -= _chunk[1] - _chunk[2]254 if item < 0:255 return _i, item + _chunk[1]256 def _slice_impl(self, item):257 new_source = SavedSource(self.app, self.path)258 if item.start is None:259 start_chunk, start_offset = 0, self._chunks[0][2]260 else:261 start_chunk, start_offset = self.locate_chunk(item.start)262 if item.stop is None:263 stop_chunk, stop_offset = len(self._chunks) - 1, self._chunks[-1][1]264 else:265 stop_chunk, stop_offset = self.locate_chunk(item.stop)266 if stop_offset == 0:267 stop_chunk -= 1268 stop_offset = self._chunks[stop_chunk][1]269 new_chunks = list(map(lambda x: list(x), self._chunks[start_chunk:stop_chunk + 1]))270 new_chunks[0][2] = start_offset271 new_chunks[-1][1] = stop_offset272 new_source._chunks = list(map(lambda x: tuple(x), new_chunks))273 new_source.refresh()274 return new_source275 def _item_impl(self, item):276 chunk, offset = self.locate_chunk(item)277 if self._chunk == -1 or self._chunk != chunk:278 self._chunk = chunk279 self.loader(self._chunks[self._chunk])280 image = self.data[offset]281 return image.grayscale, image.thermal282class TestSource(Source):283 resources = [284 ('tests/train-images-idx3-ubyte', 'tests/train-labels-idx1-ubyte'),285 ('tests/t10k-images-idx3-ubyte', 'tests/t10k-labels-idx1-ubyte')286 ]287 def __init__(self, app: Container):288 super(TestSource, self).__init__(app)289 self.data = self.fetch()290 pass291 def fetch(self) -> List[Tuple]:292 import struct293 from PIL import Image294 data = []295 for images, labels in self.resources:296 with open(images, 'br') as img_files, open(labels, 'br') as lab_files:297 # Read the magic number298 img_files.read(4)299 lab_files.read(8)300 # Read the file number301 num = img_files.read(4)302 num = struct.unpack('>i', num)[0]303 # Read the size of image304 height = struct.unpack('>i', img_files.read(4))[0]305 width = struct.unpack('>i', img_files.read(4))[0]306 size = width * height307 padding = b'\x00\x00\x00'308 # Read the images309 for i in range(num):310 img = img_files.read(size)311 lab = lab_files.read(1)312 # Convert format313 label = struct.unpack('>i', padding + lab)[0]314 image = Image.frombytes(315 mode='L',316 size=(28, 28),317 data=img318 )319 data.append((320 self.loader.loader(image).unsqueeze(0).cuda(),321 label322 ))...
artist.py.svn-base
Source:artist.py.svn-base
1#!/usr/bin/env python2# encoding: utf-83"""4A Python interface to the The Echo Nest's web API. See5http://developer.echonest.com/ for details.6"""7from config import CACHE8import document9import util10class Artist(object):11 def __init__(self, identifier, name=None):12 if len(identifier)==18:13 identifier = 'music://id.echonest.com/~/AR/' + identifier14 self._identifier = identifier15 self._name = name16 self._audio = document.WebDocumentSet(identifier, 'get_audio')17 self._image = document.WebDocumentSet(identifier, 'get_images')18 self._biographies = document.WebDocumentSet(identifier, 'get_biographies')19 self._blogs = document.WebDocumentSet(identifier, 'get_blogs')20 self._news = document.WebDocumentSet(identifier, 'get_news')21 self._reviews = document.WebDocumentSet(identifier, 'get_reviews')22 self._similar = SimilarDocumentSet(identifier)23 self._video = document.WebDocumentSet(identifier, 'get_video')24 self._familiarity = None25 self._hotttnesss = None26 self._urls = None27 self._terms = None28 def audio(self, rows=15, start=0, refresh=False):29 if refresh or not CACHE:30 self._audio = document.WebDocumentSet(self._identifier, 'get_audio')31 return self._audio[start:start + rows]32 def images(self, rows=15, start=0, refresh=False):33 if refresh or not CACHE:34 self._images = document.WebDocumentSet(self._identifier, 'get_images')35 return self._image[start:start + rows]36 37 def biographies(self, rows=15, start=0, refresh=False):38 if refresh or not CACHE:39 self._biographies = document.WebDocumentSet(self._identifier, 'get_biographies')40 return self._biographies[start:start + rows]41 def blogs(self, rows=15, start=0, refresh=False):42 if refresh or not CACHE:43 self._blogs = document.WebDocumentSet(self._identifier, 'get_blogs')44 return self._blogs[start:start + rows]45 46 def news(self, rows=15, start=0, refresh=False):47 if refresh or not CACHE:48 self._news = document.WebDocumentSet(self._identifier, 'get_news')49 return self._news[start:start + rows]50 51 def reviews(self, rows=15, start=0, refresh=False):52 if refresh or not CACHE:53 self._reviews = document.WebDocumentSet(self._identifier, 'get_reviews')54 return self._reviews[start:start + rows]55 56 def similar(self, rows=15, start=0, refresh=False):57 if refresh or not CACHE:58 self._similar = SimilarDocumentSet(self._identifier)59 return self._similar[start:start + rows]60 61 def video(self, rows=15, start=0, refresh=False):62 if refresh or not CACHE:63 self._video = document.WebDocumentSet(self._identifier, 'get_video')64 return self._video[start:start + rows]65 def familiarity(self, refresh=False):66 """Returns our numerical estimation of how 67 familiar an artist currently is to the world."""68 if self._familiarity is None or not CACHE:69 try:70 params = {'id': self.identifier}71 response = util.call('get_familiarity', params).findtext('artist/familiarity')72 self._familiarity = float(response)73 except:74 self._familiarity = 075 return self._familiarity76 def hotttnesss(self, refresh=False):77 """Returns our numerical description of how 78 hottt an artist currently is."""79 if self._hotttnesss is None or not CACHE:80 try:81 params = {'id': self.identifier}82 response = util.call('get_hotttnesss', params).findtext('artist/hotttnesss')83 self._hotttnesss = float(response)84 except:85 self._hotttnesss = 086 return self._hotttnesss87 @property88 def name(self):89 if self._name is None or not CACHE:90 self._name = util.call('get_profile', {'id': self.identifier}).findtext('artist/name')91 return self._name92 93 @property94 def identifier(self):95 """A unique identifier for an artist.96 See http://developer.echonest.com/docs/datatypes/97 for more information"""98 return self._identifier99 100 @property101 def urls(self):102 """Get links to the artist's official site, MusicBrainz site, 103 MySpace site, Wikipedia article, Amazon list, and iTunes page."""104 if self._urls is None or not CACHE:105 response = util.call('get_urls', {'id': self.identifier}).find('artist').getchildren()106 self._urls = dict((url.tag[:-4], url.text) for url in response if url.tag[-4:] == '_url')107 return self._urls108 def terms(self):109 if self._terms is None or not CACHE:110 response = util.call('get_top_terms', {'id': self.identifier}).findall('terms/term')111 self._terms = [e.text for e in response]112 return self._terms113 def __repr__(self):114 return "<Artist '%s'>" % str(self)115 116 def __str__(self):117 return self.name if isinstance(self.name, str) else self.name.encode('utf-8')118 119 def clear_cache(self):120 pass121TRUTH = {True: 'Y', False: 'N'}122SEARCH_ARTISTS_CACHE = {}123def search_artists(name, exact=False, sounds_like=True, rows=15, refresh=False):124 """Search for an artist using a query on the artist name.125 This may perform a sounds-like search to correct common 126 spelling mistakes."""127 global SEARCH_ARTISTS_CACHE128 if CACHE and not refresh:129 try:130 return SEARCH_ARTISTS_CACHE[(name, exact, sounds_like, rows)]131 except KeyError:132 pass133 params = {'query': name, 'exact': TRUTH[exact], 134 'sounds_like': TRUTH[sounds_like], 'rows': rows}135 response = util.call('search_artists', params).findall('artists/artist')136 value = [Artist(a.findtext('id'), a.findtext('name')) for a in response]137 SEARCH_ARTISTS_CACHE[(name, exact, sounds_like, rows)] = value138 return value139TOP_HOTTT_ARTISTS_CACHE = []140def get_top_hottt_artists(rows=15, refresh=False):141 """Retrieves a list of the top hottt artists.142 Do not request this more than once an hour."""143 global TOP_HOTTT_ARTISTS_CACHE144 if TOP_HOTTT_ARTISTS_CACHE==[] or refresh or not CACHE:145 response = util.call('get_top_hottt_artists', {'rows': rows}).findall('artists/artist')146 TOP_HOTTT_ARTISTS_CACHE = [Artist(a.findtext('id'), a.findtext('name')) for a in response]147 return TOP_HOTTT_ARTISTS_CACHE148class SimilarDocumentSet(document.DocumentSet):149 def __init__(self, identifier):150 super(SimilarDocumentSet, self).__init__(identifier, 'get_similar', 'similar/artist')151 def __len__(self):152 if self._len is None:153 self._len = 100154 return self._len155 def __getitem__(self, k):156 if k.stop > 100:157 raise util.EchoNestAPIError(5, 'Invalid parameter: "rows" must be less than or equal to 100')158 chunk_and_index = lambda i: (i / self._cache.chunk_size, i % self._cache.chunk_size)159 start, start_chunk, start_index, stop = self.chunk_magic(k)160 if (stop + 1) > self._cache.chunk_size:161 self.__init__(self.identifier)162 self._cache.chunk_size = stop + 1163 chunk_and_index = lambda i: (i / self._cache.chunk_size, i % self._cache.chunk_size)164 start, start_chunk, start_index, stop = self.chunk_magic(k)165 stop_chunk, stop_index = chunk_and_index(stop)166 items = []167 for chunk in xrange(start_chunk, stop_chunk + 1):168 elements = self._cache[chunk].findall(self.element_path)169 if chunk == start_chunk and chunk == stop_chunk:170 elements = elements[start_index:stop_index + 1]171 elif chunk == start_chunk:172 elements = elements[start_index:]173 elif chunk == stop_chunk:174 elements = elements[:stop_index + 1]175 items.extend([self._parse_element(e) for e in elements])176 return items177 def chunk_magic(self, k):178 chunk_and_index = lambda i: (i / self._cache.chunk_size, i % self._cache.chunk_size)179 if not isinstance(k, slice):180 raise TypeError181 start = k.start or 0182 start_chunk, start_index = chunk_and_index(start)183 stop = min(k.stop or len(self), len(self)) - 1 # use inclusive to simplify logic184 return start, start_chunk, start_index, stop185 def _parse_element(self, element):...
artist.py
Source:artist.py
1#!/usr/bin/env python2# encoding: utf-83"""4A Python interface to the The Echo Nest's web API. See5http://developer.echonest.com/ for details.6"""7from config import CACHE8import document9import util10class Artist(object):11 def __init__(self, identifier, name=None):12 if len(identifier)==18:13 identifier = 'music://id.echonest.com/~/AR/' + identifier14 self._identifier = identifier15 self._name = name16 self._audio = document.WebDocumentSet(identifier, 'get_audio')17 self._image = document.WebDocumentSet(identifier, 'get_images')18 self._biographies = document.WebDocumentSet(identifier, 'get_biographies')19 self._blogs = document.WebDocumentSet(identifier, 'get_blogs')20 self._news = document.WebDocumentSet(identifier, 'get_news')21 self._reviews = document.WebDocumentSet(identifier, 'get_reviews')22 self._similar = SimilarDocumentSet(identifier)23 self._video = document.WebDocumentSet(identifier, 'get_video')24 self._familiarity = None25 self._hotttnesss = None26 self._urls = None27 self._terms = None28 def audio(self, rows=15, start=0, refresh=False):29 if refresh or not CACHE:30 self._audio = document.WebDocumentSet(self._identifier, 'get_audio')31 return self._audio[start:start + rows]32 def images(self, rows=15, start=0, refresh=False):33 if refresh or not CACHE:34 self._images = document.WebDocumentSet(self._identifier, 'get_images')35 return self._image[start:start + rows]36 37 def biographies(self, rows=15, start=0, refresh=False):38 if refresh or not CACHE:39 self._biographies = document.WebDocumentSet(self._identifier, 'get_biographies')40 return self._biographies[start:start + rows]41 def blogs(self, rows=15, start=0, refresh=False):42 if refresh or not CACHE:43 self._blogs = document.WebDocumentSet(self._identifier, 'get_blogs')44 return self._blogs[start:start + rows]45 46 def news(self, rows=15, start=0, refresh=False):47 if refresh or not CACHE:48 self._news = document.WebDocumentSet(self._identifier, 'get_news')49 return self._news[start:start + rows]50 51 def reviews(self, rows=15, start=0, refresh=False):52 if refresh or not CACHE:53 self._reviews = document.WebDocumentSet(self._identifier, 'get_reviews')54 return self._reviews[start:start + rows]55 56 def similar(self, rows=15, start=0, refresh=False):57 if refresh or not CACHE:58 self._similar = SimilarDocumentSet(self._identifier)59 return self._similar[start:start + rows]60 61 def video(self, rows=15, start=0, refresh=False):62 if refresh or not CACHE:63 self._video = document.WebDocumentSet(self._identifier, 'get_video')64 return self._video[start:start + rows]65 def familiarity(self, refresh=False):66 """Returns our numerical estimation of how 67 familiar an artist currently is to the world."""68 if self._familiarity is None or not CACHE:69 try:70 params = {'id': self.identifier}71 response = util.call('get_familiarity', params).findtext('artist/familiarity')72 self._familiarity = float(response)73 except:74 self._familiarity = 075 return self._familiarity76 def hotttnesss(self, refresh=False):77 """Returns our numerical description of how 78 hottt an artist currently is."""79 if self._hotttnesss is None or not CACHE:80 try:81 params = {'id': self.identifier}82 response = util.call('get_hotttnesss', params).findtext('artist/hotttnesss')83 self._hotttnesss = float(response)84 except:85 self._hotttnesss = 086 return self._hotttnesss87 @property88 def name(self):89 if self._name is None or not CACHE:90 self._name = util.call('get_profile', {'id': self.identifier}).findtext('artist/name')91 return self._name92 93 @property94 def identifier(self):95 """A unique identifier for an artist.96 See http://developer.echonest.com/docs/datatypes/97 for more information"""98 return self._identifier99 100 @property101 def urls(self):102 """Get links to the artist's official site, MusicBrainz site, 103 MySpace site, Wikipedia article, Amazon list, and iTunes page."""104 if self._urls is None or not CACHE:105 response = util.call('get_urls', {'id': self.identifier}).find('artist').getchildren()106 self._urls = dict((url.tag[:-4], url.text) for url in response if url.tag[-4:] == '_url')107 return self._urls108 def terms(self):109 if self._terms is None or not CACHE:110 response = util.call('get_top_terms', {'id': self.identifier}).findall('terms/term')111 self._terms = [e.text for e in response]112 return self._terms113 def __repr__(self):114 return "<Artist '%s'>" % str(self)115 116 def __str__(self):117 return self.name if isinstance(self.name, str) else self.name.encode('utf-8')118 119 def clear_cache(self):120 pass121TRUTH = {True: 'Y', False: 'N'}122SEARCH_ARTISTS_CACHE = {}123def search_artists(name, exact=False, sounds_like=True, rows=15, refresh=False):124 """Search for an artist using a query on the artist name.125 This may perform a sounds-like search to correct common 126 spelling mistakes."""127 global SEARCH_ARTISTS_CACHE128 if CACHE and not refresh:129 try:130 return SEARCH_ARTISTS_CACHE[(name, exact, sounds_like, rows)]131 except KeyError:132 pass133 params = {'query': name, 'exact': TRUTH[exact], 134 'sounds_like': TRUTH[sounds_like], 'rows': rows}135 response = util.call('search_artists', params).findall('artists/artist')136 value = [Artist(a.findtext('id'), a.findtext('name')) for a in response]137 SEARCH_ARTISTS_CACHE[(name, exact, sounds_like, rows)] = value138 return value139TOP_HOTTT_ARTISTS_CACHE = []140def get_top_hottt_artists(rows=15, refresh=False):141 """Retrieves a list of the top hottt artists.142 Do not request this more than once an hour."""143 global TOP_HOTTT_ARTISTS_CACHE144 if TOP_HOTTT_ARTISTS_CACHE==[] or refresh or not CACHE:145 response = util.call('get_top_hottt_artists', {'rows': rows}).findall('artists/artist')146 TOP_HOTTT_ARTISTS_CACHE = [Artist(a.findtext('id'), a.findtext('name')) for a in response]147 return TOP_HOTTT_ARTISTS_CACHE148class SimilarDocumentSet(document.DocumentSet):149 def __init__(self, identifier):150 super(SimilarDocumentSet, self).__init__(identifier, 'get_similar', 'similar/artist')151 def __len__(self):152 if self._len is None:153 self._len = 100154 return self._len155 def __getitem__(self, k):156 if k.stop > 100:157 raise util.EchoNestAPIError(5, 'Invalid parameter: "rows" must be less than or equal to 100')158 chunk_and_index = lambda i: (i / self._cache.chunk_size, i % self._cache.chunk_size)159 start, start_chunk, start_index, stop = self.chunk_magic(k)160 if (stop + 1) > self._cache.chunk_size:161 self.__init__(self.identifier)162 self._cache.chunk_size = stop + 1163 chunk_and_index = lambda i: (i / self._cache.chunk_size, i % self._cache.chunk_size)164 start, start_chunk, start_index, stop = self.chunk_magic(k)165 stop_chunk, stop_index = chunk_and_index(stop)166 items = []167 for chunk in xrange(start_chunk, stop_chunk + 1):168 elements = self._cache[chunk].findall(self.element_path)169 if chunk == start_chunk and chunk == stop_chunk:170 elements = elements[start_index:stop_index + 1]171 elif chunk == start_chunk:172 elements = elements[start_index:]173 elif chunk == stop_chunk:174 elements = elements[:stop_index + 1]175 items.extend([self._parse_element(e) for e in elements])176 return items177 def chunk_magic(self, k):178 chunk_and_index = lambda i: (i / self._cache.chunk_size, i % self._cache.chunk_size)179 if not isinstance(k, slice):180 raise TypeError181 start = k.start or 0182 start_chunk, start_index = chunk_and_index(start)183 stop = min(k.stop or len(self), len(self)) - 1 # use inclusive to simplify logic184 return start, start_chunk, start_index, stop185 def _parse_element(self, element):...
support.py
Source:support.py
1from numpy import ndarray, array2def fix_slicing(index, logical_length, allow_column=True):3 """4 Checks, and caps, the slices being used.5 :param index: The initial index, which may be int or slice.6 :param logical_length: The current logical length of the data. It is used to cap the indices.7 If None, indices will not be capped.8 :param allow_column: Tells whether allowing 1 or 2 dimensions indexing.9 :return: The fixed slicing, if no exception occurs.10 """11 column_index = None12 if isinstance(index, tuple):13 if allow_column:14 if len(index) == 2:15 index, column_index = index16 else:17 raise IndexError("At most two indices/slices (bi-dimensional indexing) are supported")18 else:19 raise IndexError("Only one index/slice (one-dimensional indexing) is supported")20 if isinstance(index, slice):21 start = index.start22 stop = index.stop23 step = index.step24 if start is None:25 start = 026 if stop is None:27 stop = logical_length28 if step and step != 1:29 raise IndexError("Slices with step != 1 are not supported")30 if start < 0 or (stop is not None and stop < 0):31 raise IndexError("Negative indices in slices are not supported")32 if stop is not None and stop < start:33 raise IndexError("Slices must have start <= stop indices")34 elif logical_length is None:35 return start, stop, column_index36 else:37 # Here, start will be <= stop. We will limit both by the logical_length, silently.38 return min(start, logical_length), min(stop, logical_length), column_index39 elif isinstance(index, int):40 if index < 0:41 raise IndexError("Negative indices are not supported")42 if logical_length is None:43 return index, None, column_index44 else:45 return min(index, logical_length), None, column_index46 else:47 raise TypeError("Only slices (non-negative, growing, and with step 1) or non-negative integer indices are "48 "supported")49def fix_input(index, expected_width, expected_length, expected_type, value):50 """51 Checks and fixes the value according to the given index type.52 :param index: The initial index, which may be int or slice.53 :param expected_width: The expected width of the value.54 :param expected_length: The expected length of the data. It will be ignored if the index is a number.55 :param expected_type: The expected type for array input.56 :param value: The given value, which may be scalar or array.57 :return: The fixed value, if no exception occurs.58 """59 # If the input is an iterable, convert it to an 1-dimensional array.60 if isinstance(value, (tuple, list)):61 value = array(value)62 if isinstance(index, slice):63 if not isinstance(value, ndarray) and value.dtype != expected_type:64 raise TypeError("When setting a slice, the value must be a numpy array of (stop - start)x(width) "65 "elements (if the width is 1, an uni-dimensional array of (stop-start) elements is "66 "also allowed)")67 if expected_width == 1 and len(value.shape) == 1:68 value = value[:]69 value.shape = (value.size, 1)70 if value.shape != (expected_length, expected_width):71 raise TypeError("When setting a slice, the value must be a numpy array of (stop - start)x(width) "72 "elements (if the width is 1, an uni-dimensional array of (stop-start) elements is "73 "also allowed)")74 elif isinstance(index, int):75 if expected_width > 1 and (not isinstance(value, ndarray) or value.shape != (expected_width,)):76 raise TypeError("When setting an index, the value must be a 1-dimensional numpy array of the appropriate "77 "size (=width), if the expected width is > 1")78 if expected_width == 1 and not isinstance(value, ndarray):79 value = array((value,))80 else:81 raise TypeError("Only slices (non-negative, growing, and with step 1) or non-negative integer indices are "82 "supported")83 return value84def chunked_slicing(slice_start, slice_stop, chunk_size):85 """86 Iterator that yields, every time, a data structure like this:87 - Current chunk index88 - Start index in chunk89 - Stop index in chunk90 - Start index in source/destination91 - Stop index in source/destination92 It is guaranteed: 0 <= slice_start <= slice_stop <= logical length <= chunk_size * chunk_count.93 The algorithm goes like this:94 - Before: we know the starter bin, and the end bin.95 :param slice_start: The overall start.96 :param slice_stop: The overall stop.97 :param chunk_size: The chunk size.98 :return: A generator.99 """100 start_chunk = slice_start // chunk_size101 stop_chunk = slice_stop // chunk_size102 if start_chunk == stop_chunk:103 # This is the easiest case.104 # The data indices will be 0 and stop_chunk - start_chunk.105 # The chunk index will be start_chunk.106 # the start index in chunk, and end index in chunk, will both involve modulo.107 data_indices = (0, slice_stop - slice_start)108 chunk_indices = (slice_start % chunk_size, slice_stop % chunk_size)109 yield data_indices, start_chunk, chunk_indices110 else:111 # In this case, start chunk will always be lower than end chunk.112 chunk_start_index = slice_start % chunk_size113 chunk_stop_index = slice_stop % chunk_size114 first_iteration = True115 data_index = 0116 current_chunk = start_chunk117 # We know that, in the first iteration:118 # - chunk_start_index >= 0119 # - start_chunk < stop_chunk, strictly120 # And in further iterations:121 # - chunk_start_index == 0122 # - start_chunk <= stop_chunk123 while True:124 # The chunk upper bound will be:125 # - The chunk stop index, if in last chunk.126 # - The chunk size, otherwise.127 if current_chunk == stop_chunk:128 # Another check here: if the chunk stop index is 0, just break.129 if chunk_stop_index == 0:130 return131 current_chunk_ubound = chunk_stop_index132 else:133 current_chunk_ubound = chunk_size134 # The chunk lower bound will be:135 # - The chunk start index, if in first chunk.136 # - 0, otherwise.137 if first_iteration:138 current_chunk_lbound = chunk_start_index139 else:140 current_chunk_lbound = 0141 # Now we know the chunk start and chunk end bounds.142 # We also know the current chunk index.143 # We also know the current data index.144 # Also we can compute the current data length145 length = current_chunk_ubound - current_chunk_lbound146 # We can yield all the data now.147 data_indices = (data_index, data_index + length)148 chunk_indices = (current_chunk_lbound, current_chunk_ubound)149 yield data_indices, current_chunk, chunk_indices150 # If this is the last chunk, we must exit.151 if current_chunk == stop_chunk:152 return153 # If not, then we move the data index and the current chunk.154 current_chunk += 1155 data_index += length156 # Finish the first iteration....
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!