Best Python code snippet using playwright-python
searcher.py
Source:searcher.py
1# Copyright 1999-2021 Alibaba Group Holding Ltd.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import itertools15import logging16import os17import pickle # nosec # pylint: disable=import_pickle18import random19from hashlib import md520from collections import defaultdict21import numpy as np22from .... import opcodes23from .... import tensor as mt24from ....config import options25from ....core import ENTITY_TYPE, OutputType, recursive_tile26from ....core.context import get_context27from ....core.operand import OperandStage28from ....lib.filesystem import get_fs, FileSystem29from ....serialization.serializables import (30 KeyField,31 StringField,32 Int32Field,33 Int64Field,34 DictField,35 AnyField,36 BytesField,37 BoolField,38)39from ....tensor.core import TensorOrder40from ....utils import has_unknown_shape, Timer, ceildiv41from ...operands import LearnOperand, LearnOperandMixin42from ..core import proxima, validate_tensor, get_proxima_type43logger = logging.getLogger(__name__)44class ProximaSearcher(LearnOperand, LearnOperandMixin):45 _op_type_ = opcodes.PROXIMA_SIMPLE_SEARCHER46 _tensor = KeyField("tensor")47 _distance_metric = StringField("distance_metric")48 _dimension = Int32Field("dimension")49 _row_number = Int64Field("row_number")50 _topk = Int32Field("topk")51 _threads = Int32Field("threads")52 _index = AnyField("index")53 _index_searcher = StringField("index_searcher")54 _index_searcher_params = DictField("index_searcher_params")55 _index_reformer = StringField("index_reformer")56 _index_reformer_params = DictField("index_reformer_params")57 _download_index = BoolField("download_index")58 _storage_options = BytesField(59 "storage_options", on_serialize=pickle.dumps, on_deserialize=pickle.loads60 )61 def __init__(62 self,63 tensor=None,64 distance_metric=None,65 dimension=None,66 row_number=None,67 topk=None,68 index=None,69 threads=None,70 index_searcher=None,71 index_searcher_params=None,72 index_reformer=None,73 index_reformer_params=None,74 download_index=None,75 storage_options=None,76 output_types=None,77 stage=None,78 **kw,79 ):80 super().__init__(81 _tensor=tensor,82 _distance_metric=distance_metric,83 _row_number=row_number,84 _dimension=dimension,85 _index=index,86 _threads=threads,87 _index_searcher=index_searcher,88 _index_searcher_params=index_searcher_params,89 _index_reformer=index_reformer,90 _index_reformer_params=index_reformer_params,91 _download_index=download_index,92 _output_types=output_types,93 _topk=topk,94 _storage_options=storage_options,95 **kw,96 )97 if self._output_types is None:98 self._output_types = [OutputType.tensor, OutputType.tensor]99 @property100 def tensor(self):101 return self._tensor102 @property103 def distance_metric(self):104 return self._distance_metric105 @property106 def dimension(self):107 return self._dimension108 @property109 def row_number(self):110 return self._row_number111 @property112 def topk(self):113 return self._topk114 @property115 def threads(self):116 return self._threads117 @property118 def index(self):119 return self._index120 @property121 def index_searcher(self):122 return self._index_searcher123 @property124 def index_searcher_params(self):125 return self._index_searcher_params126 @property127 def index_reformer(self):128 return self._index_reformer129 @property130 def index_reformer_params(self):131 return self._index_reformer_params132 @property133 def download_index(self):134 return self._download_index135 @property136 def storage_options(self):137 return self._storage_options138 @property139 def output_limit(self):140 return 1 if self._download_index else 2141 def _set_inputs(self, inputs):142 super()._set_inputs(inputs)143 if self.stage != OperandStage.agg and not self._download_index:144 self._tensor = self._inputs[0]145 if isinstance(self._index, ENTITY_TYPE):146 self._index = self._inputs[-1]147 def __call__(self, tensor, index):148 kws = [149 {150 "dtype": np.dtype(np.uint64),151 "shape": (tensor.shape[0], self._topk),152 "order": TensorOrder.C_ORDER,153 },154 {155 "dtype": np.dtype(np.float32),156 "shape": (tensor.shape[0], self._topk),157 "order": TensorOrder.C_ORDER,158 },159 ]160 inputs = [tensor]161 if hasattr(index, "op"):162 inputs.append(index)163 return mt.ExecutableTuple(self.new_tileables(inputs, kws=kws))164 @classmethod165 def _build_download_chunks(cls, op, indexes):166 ctx = get_context()167 workers = ctx.get_worker_addresses() or [None]168 if len(workers) < len(indexes):169 workers = [workers[i % len(workers)] for i in range(len(indexes))]170 indexes_iter = iter(itertools.cycle(indexes))171 download_chunks = defaultdict(list)172 for i, worker in enumerate(workers):173 download_op = op.copy().reset_key()174 download_op.stage = OperandStage.map175 download_op.expect_worker = worker176 download_op._download_index = True177 download_op._tensor = None178 download_op._index = next(indexes_iter)179 download_chunks[i % len(indexes)].append(180 download_op.new_chunk(181 None, index=(i,), shape=(), dtype=op.inputs[0].dtype182 )183 )184 return download_chunks185 @classmethod186 def tile(cls, op: "ProximaSearcher"):187 tensor = op.tensor188 index = op.index189 topk = op.topk190 outs = op.outputs191 row_number = op.row_number192 ctx = get_context()193 # make sure all inputs have known chunk sizes194 if has_unknown_shape(*op.inputs):195 yield196 rechunk_size = dict()197 if tensor.chunk_shape[1] > 1:198 rechunk_size[1] = tensor.shape[1]199 if row_number is not None:200 rechunk_size[0] = tensor.shape[0] // row_number201 if len(rechunk_size) > 0:202 tensor = yield from recursive_tile(tensor.rechunk(rechunk_size))203 logger.warning(f"query chunks count: {len(tensor.chunks)} ")204 if hasattr(index, "op"):205 built_indexes = [index.chunks] * len(tensor.chunks)206 else:207 # index path208 fs: FileSystem = get_fs(index, op.storage_options)209 index_paths = [210 f for f in fs.ls(index) if f.rsplit("/", 1)[-1].startswith("proxima_")211 ]212 download_chunks = cls._build_download_chunks(op, index_paths)213 iters = [iter(itertools.cycle(i)) for i in download_chunks.values()]214 built_indexes = []215 for _ in range(len(tensor.chunks)):216 built_indexes.append([next(it) for it in iters])217 if hasattr(index, "op"):218 index_chunks_workers = [219 m["bands"][0][0]220 for m in ctx.get_chunks_meta(221 [c.key for c in index.chunks], fields=["bands"]222 )223 ]224 else:225 index_chunks_workers = [None] * len(built_indexes[0])226 out_chunks = [], []227 for i, tensor_chunk in enumerate(tensor.chunks):228 pk_chunks, distance_chunks = [], []229 for j, chunk_index, worker in zip(230 itertools.count(), built_indexes[i], index_chunks_workers231 ):232 chunk_op = op.copy().reset_key()233 chunk_op.stage = OperandStage.map234 if hasattr(index, "op"):235 chunk_op.expect_worker = worker236 else:237 chunk_op.expect_worker = chunk_index.op.expect_worker238 chunk_op._index = chunk_index239 chunk_op._tensor = None240 chunk_kws = [241 {242 "index": (tensor_chunk.index[0], j),243 "dtype": outs[0].dtype,244 "shape": (tensor_chunk.shape[0], topk),245 "order": TensorOrder.C_ORDER,246 },247 {248 "index": (tensor_chunk.index[0], j),249 "dtype": outs[1].dtype,250 "shape": (tensor_chunk.shape[0], topk),251 "order": TensorOrder.C_ORDER,252 },253 ]254 chunk_inputs = [tensor_chunk, chunk_index]255 pk_chunk, distance_chunk = chunk_op.new_chunks(256 chunk_inputs, kws=chunk_kws257 )258 pk_chunks.append(pk_chunk)259 distance_chunks.append(distance_chunk)260 if len(pk_chunks) == 1:261 out_chunks[0].append(pk_chunks[0])262 out_chunks[1].append(distance_chunks[0])263 continue264 # combine topk results265 combine_size = options.combine_size266 tensor_out_chunks = [pk_chunks, distance_chunks]267 while True:268 chunk_size = ceildiv(len(tensor_out_chunks[0]), combine_size)269 cur_out_chunks = [[], []]270 for k in range(chunk_size):271 to_combine_pks = tensor_out_chunks[0][272 k * combine_size : (k + 1) * combine_size273 ]274 to_combine_distances = tensor_out_chunks[1][275 k * combine_size : (k + 1) * combine_size276 ]277 chunk_op = op.copy().reset_key()278 chunk_op.stage = OperandStage.agg279 chunk_op._tensor = None280 chunk_op._index = None281 agg_chunk_kws = [282 {283 "index": (i, 0),284 "dtype": outs[0].dtype,285 "shape": (tensor_chunk.shape[0], topk),286 "order": outs[0].order,287 },288 {289 "index": (i, 0),290 "dtype": outs[1].dtype,291 "shape": (tensor_chunk.shape[0], topk),292 "order": outs[1].order,293 },294 ]295 pk_result_chunk, distance_result_chunk = chunk_op.new_chunks(296 to_combine_pks + to_combine_distances, kws=agg_chunk_kws297 )298 cur_out_chunks[0].append(pk_result_chunk)299 cur_out_chunks[1].append(distance_result_chunk)300 tensor_out_chunks = cur_out_chunks301 if len(tensor_out_chunks[0]) == 1:302 break303 out_chunks[0].append(tensor_out_chunks[0][0])304 out_chunks[1].append(tensor_out_chunks[1][0])305 kws = []306 pk_params = outs[0].params307 pk_params["chunks"] = out_chunks[0]308 pk_params["nsplits"] = (tensor.nsplits[0], (topk,))309 kws.append(pk_params)310 distance_params = outs[1].params311 distance_params["chunks"] = out_chunks[1]312 distance_params["nsplits"] = (tensor.nsplits[0], (topk,))313 kws.append(distance_params)314 new_op = op.copy()315 return new_op.new_tileables(op.inputs, kws=kws)316 @classmethod317 def _execute_download(cls, ctx, op: "ProximaSearcher"):318 index_path = op.index319 with Timer() as timer:320 fs = get_fs(index_path, op.storage_options)321 # TODO322 dirs = os.environ.get("MARS_SPILL_DIRS")323 if dirs:324 temp_dir = random.choice(dirs.split(":"))325 else:326 temp_dir = "/tmp/proxima-index/"327 local_path = os.path.join(328 temp_dir, md5(str(index_path).encode("utf-8")).hexdigest()329 ) # noqa: B303 # nosec330 exist_state = True331 if not os.path.exists(local_path):332 exist_state = False333 if not os.path.exists(local_path.rsplit("/", 1)[0]):334 os.mkdir(local_path.rsplit("/", 1)[0])335 with open(local_path, "wb") as out_f:336 with fs.open(index_path, "rb") as in_f:337 # 32M338 chunk_bytes = 32 * 1024**2339 while True:340 data = in_f.read(chunk_bytes)341 if data:342 out_f.write(data)343 else:344 break345 logger.warning(346 f"ReadingFromVolume({op.key}), index path: {index_path}, "347 f"local_path {local_path}"348 f"size {os.path.getsize(local_path)}, "349 f"already exist {exist_state}, "350 f"costs {timer.duration} seconds "351 f"speed {round(os.path.getsize(local_path) / (1024 ** 2) / timer.duration, 2)} MB/s"352 )353 ctx[op.outputs[0].key] = local_path354 @classmethod355 def _execute_map(cls, ctx, op: "ProximaSearcher"):356 if op.download_index:357 cls._execute_download(ctx, op)358 return359 inp = ctx[op.tensor.key]360 index_path = ctx[op.inputs[-1].key]361 with Timer() as timer:362 flow = proxima.IndexFlow(363 container_name="MMapFileContainer",364 container_params={},365 searcher_name=op.index_searcher,366 searcher_params=op.index_searcher_params,367 measure_name="",368 measure_params={},369 reformer_name=op.index_reformer,370 reformer_params=op.index_reformer_params,371 )372 flow.load(index_path)373 vecs = np.ascontiguousarray(inp)374 logger.warning(375 f"LoadIndex({op.key}) index path: {index_path} costs {timer.duration} seconds"376 )377 logger.warning(f"threads count:{op.threads} vecs count:{len(vecs)}")378 with Timer() as timer:379 batch = 10000380 s_idx = 0381 e_idx = min(s_idx + batch, len(vecs))382 result_pks, result_distances = None, None383 while s_idx < len(vecs):384 with Timer() as timer_s:385 tp = get_proxima_type(vecs.dtype)386 result_pks_b, result_distances_b = proxima.IndexUtility.ann_search(387 searcher=flow,388 type=tp,389 query=vecs[s_idx:e_idx],390 topk=op.topk,391 threads=op.threads,392 )393 if result_pks is None:394 result_pks = np.asarray(result_pks_b)395 result_distances = np.asarray(result_distances_b)396 else:397 result_pks = np.concatenate(398 (result_pks, np.asarray(result_pks_b))399 )400 result_distances = np.concatenate(401 (result_distances, np.asarray(result_distances_b))402 )403 s_idx = e_idx404 e_idx = min(s_idx + batch, len(vecs))405 logger.warning(406 f"Search({op.key}) count {s_idx}/{len(vecs)}:{round(s_idx * 100 / len(vecs), 2)}%"407 f" costs {round(timer_s.duration, 2)} seconds"408 )409 logger.warning(f"Search({op.key}) costs {timer.duration} seconds")410 ctx[op.outputs[0].key] = np.asarray(result_pks)411 ctx[op.outputs[1].key] = np.asarray(result_distances)412 @classmethod413 def _execute_agg(cls, ctx, op: "ProximaSearcher"):414 inputs_data = [ctx[inp.key] for inp in op.inputs]415 chunk_num = len(inputs_data) // 2416 pks = np.concatenate(inputs_data[:chunk_num], axis=1)417 distances = np.concatenate(inputs_data[chunk_num:], axis=1)418 n_doc = len(pks)419 topk = op.topk420 # calculate topk on rows421 if op.distance_metric == "InnerProduct":422 inds = np.argsort(distances, axis=1)[:, -1 : -topk - 1 : -1]423 else:424 inds = np.argsort(distances, axis=1)[:, :topk]425 result_pks = np.empty((n_doc, topk), dtype=pks.dtype)426 result_distances = np.empty((n_doc, topk), dtype=distances.dtype)427 rng = np.arange(n_doc)428 for i in range(topk):429 ind = inds[:, i]430 result_pks[:, i] = pks[rng, ind]431 result_distances[:, i] = distances[rng, ind]432 del rng433 ctx[op.outputs[0].key] = result_pks434 ctx[op.outputs[1].key] = result_distances435 @classmethod436 def execute(cls, ctx, op: "ProximaSearcher"):437 if op.stage != OperandStage.agg:438 return cls._execute_map(ctx, op)439 else:440 return cls._execute_agg(ctx, op)441def search_index(442 tensor,443 topk,444 index,445 threads=4,446 row_number=None,447 dimension=None,448 distance_metric=None,449 index_searcher=None,450 index_searcher_params=None,451 index_reformer=None,452 index_reformer_params=None,453 storage_options=None,454 run=True,455 session=None,456 run_kwargs=None,457):458 tensor = validate_tensor(tensor)459 if dimension is None:460 dimension = tensor.shape[1]461 if index_searcher is None:462 index_searcher = ""463 if index_searcher_params is None:464 index_searcher_params = {}465 if index_reformer is None:466 index_reformer = ""467 if index_reformer_params is None:468 index_reformer_params = {}469 if distance_metric is None:470 distance_metric = ""471 if hasattr(index, "op") and index.op.index_path is not None:472 storage_options = storage_options or index.op.storage_options473 index = index.op.index_path474 op = ProximaSearcher(475 tensor=tensor,476 distance_metric=distance_metric,477 dimension=dimension,478 row_number=row_number,479 topk=topk,480 index=index,481 threads=threads,482 index_searcher=index_searcher,483 index_searcher_params=index_searcher_params,484 index_reformer=index_reformer,485 index_reformer_params=index_reformer_params,486 storage_options=storage_options,487 )488 result = op(tensor, index)489 if run:490 return result.execute(session=session, **(run_kwargs or dict()))491 else:...
analyzer.py
Source:analyzer.py
1# Copyright 1999-2021 Alibaba Group Holding Ltd.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import logging15from collections import deque, defaultdict16from typing import Dict, List, Tuple, Type, Union17from ....config import Config18from ....core import ChunkGraph, ChunkType, enter_mode19from ....core.operand import Fetch, VirtualOperand, LogicKeyGenerator20from ....resource import Resource21from ....typing import BandType22from ....utils import build_fetch, tokenize23from ...subtask import SubtaskGraph, Subtask24from ..core import Task, new_task_id25from .assigner import AbstractGraphAssigner, GraphAssigner26from .fusion import Coloring27logger = logging.getLogger(__name__)28class GraphAnalyzer:29 def __init__(30 self,31 chunk_graph: ChunkGraph,32 band_resource: Dict[BandType, Resource],33 task: Task,34 config: Config,35 graph_assigner_cls: Type[AbstractGraphAssigner] = None,36 stage_id: str = None,37 ):38 self._chunk_graph = chunk_graph39 self._band_resource = band_resource40 self._task = task41 self._stage_id = stage_id42 self._config = config43 self._fuse_enabled = task.fuse_enabled44 self._extra_config = task.extra_config45 if graph_assigner_cls is None:46 graph_assigner_cls = GraphAssigner47 self._graph_assigner_cls = graph_assigner_cls48 self._chunk_to_copied = dict()49 self._logic_key_generator = LogicKeyGenerator()50 @classmethod51 def _iter_start_ops(cls, chunk_graph: ChunkGraph):52 visited = set()53 op_keys = set()54 start_chunks = deque(chunk_graph.iter_indep())55 stack = deque([start_chunks.popleft()])56 while stack:57 chunk = stack.popleft()58 if chunk not in visited:59 inp_chunks = chunk_graph.predecessors(chunk)60 if not inp_chunks or all(61 inp_chunk in visited for inp_chunk in inp_chunks62 ):63 if len(inp_chunks) == 0:64 op_key = chunk.op.key65 if op_key not in op_keys:66 op_keys.add(op_key)67 yield chunk.op68 visited.add(chunk)69 stack.extend(c for c in chunk_graph[chunk] if c not in visited)70 else:71 stack.appendleft(chunk)72 stack.extendleft(73 reversed(74 [75 c76 for c in chunk_graph.predecessors(chunk)77 if c not in visited78 ]79 )80 )81 if not stack and start_chunks:82 stack.appendleft(start_chunks.popleft())83 @classmethod84 def _gen_input_chunks(85 cls,86 inp_chunks: List[ChunkType],87 chunk_to_fetch_chunk: Dict[ChunkType, ChunkType],88 ) -> List[ChunkType]:89 # gen fetch chunks for input chunks90 inp_fetch_chunks = []91 for inp_chunk in inp_chunks:92 if inp_chunk in chunk_to_fetch_chunk:93 inp_fetch_chunks.append(chunk_to_fetch_chunk[inp_chunk])94 elif isinstance(inp_chunk.op, Fetch):95 chunk_to_fetch_chunk[inp_chunk] = inp_chunk96 inp_fetch_chunks.append(inp_chunk)97 else:98 fetch_chunk = build_fetch(inp_chunk).data99 chunk_to_fetch_chunk[inp_chunk] = fetch_chunk100 inp_fetch_chunks.append(fetch_chunk)101 return inp_fetch_chunks102 @staticmethod103 def _to_band(band_or_worker: Union[BandType, str]) -> BandType:104 if isinstance(band_or_worker, tuple) and len(band_or_worker) == 2:105 # band already106 return band_or_worker107 else:108 return band_or_worker, "numa-0"109 def _gen_subtask_info(110 self,111 chunks: List[ChunkType],112 chunk_to_subtask: Dict[ChunkType, Subtask],113 chunk_to_bands: Dict[ChunkType, BandType],114 chunk_to_fetch_chunk: Dict[ChunkType, ChunkType],115 ) -> Tuple[Subtask, List[Subtask]]:116 # gen subtask and its input subtasks117 final_result_chunks_set = set(self._chunk_graph.result_chunks)118 chunks_set = set(chunks)119 result_chunks = []120 result_chunks_set = set()121 chunk_graph = ChunkGraph(result_chunks)122 out_of_scope_chunks = []123 chunk_to_copied = self._chunk_to_copied124 # subtask properties125 band = None126 is_virtual = None127 retryable = True128 chunk_priority = None129 expect_worker = None130 bands_specified = None131 for chunk in chunks:132 if expect_worker is None:133 expect_worker = chunk.op.expect_worker134 bands_specified = expect_worker is not None135 else: # pragma: no cover136 assert (137 chunk.op.expect_worker is None138 or expect_worker == chunk.op.expect_worker139 ), (140 f"expect_worker {chunk.op.expect_worker} conflicts with chunks that have same color: "141 f"{expect_worker}"142 )143 # process band144 chunk_band = chunk_to_bands.get(chunk)145 if chunk_band is not None:146 assert (147 band is None or band == chunk_band148 ), "band conflicts with chunks that have same color"149 band = chunk_band150 # process is_virtual151 if isinstance(chunk.op, VirtualOperand):152 assert is_virtual is None, "only 1 virtual operand can exist"153 is_virtual = True154 else:155 is_virtual = False156 # process retryable157 if not chunk.op.retryable:158 retryable = False159 # process priority160 if chunk.op.priority is not None:161 assert (162 chunk_priority is None or chunk_priority == chunk.op.priority163 ), "priority conflicts with chunks that have same color"164 chunk_priority = chunk.op.priority165 # process input chunks166 inp_chunks = []167 build_fetch_index_to_chunks = dict()168 for i, inp_chunk in enumerate(chunk.inputs):169 if inp_chunk in chunks_set:170 inp_chunks.append(chunk_to_copied[inp_chunk])171 else:172 build_fetch_index_to_chunks[i] = inp_chunk173 inp_chunks.append(None)174 if not isinstance(inp_chunk.op, Fetch):175 out_of_scope_chunks.append(inp_chunk)176 fetch_chunks = self._gen_input_chunks(177 list(build_fetch_index_to_chunks.values()), chunk_to_fetch_chunk178 )179 for i, fetch_chunk in zip(build_fetch_index_to_chunks, fetch_chunks):180 inp_chunks[i] = fetch_chunk181 copied_op = chunk.op.copy()182 copied_op._key = chunk.op.key183 out_chunks = [184 c.data185 for c in copied_op.new_chunks(186 inp_chunks, kws=[c.params.copy() for c in chunk.op.outputs]187 )188 ]189 for src_chunk, out_chunk in zip(chunk.op.outputs, out_chunks):190 out_chunk._key = src_chunk.key191 chunk_graph.add_node(out_chunk)192 chunk_to_copied[src_chunk] = out_chunk193 if chunk in final_result_chunks_set:194 result_chunks.append(out_chunk)195 result_chunks_set.add(out_chunk)196 if not is_virtual:197 # skip adding fetch chunk to chunk graph when op is virtual operand198 for c in inp_chunks:199 if c not in chunk_graph:200 chunk_graph.add_node(c)201 chunk_graph.add_edge(c, out_chunk)202 # add chunks with no successors into result chunks203 result_chunks.extend(204 c205 for c in chunk_graph.iter_indep(reverse=True)206 if c not in result_chunks_set207 )208 expect_bands = (209 [self._to_band(expect_worker)]210 if bands_specified211 else ([band] if band is not None else None)212 )213 # calculate priority214 if out_of_scope_chunks:215 inp_subtasks = []216 for out_of_scope_chunk in out_of_scope_chunks:217 copied_out_of_scope_chunk = chunk_to_copied[out_of_scope_chunk]218 inp_subtask = chunk_to_subtask[out_of_scope_chunk]219 if (220 copied_out_of_scope_chunk221 not in inp_subtask.chunk_graph.result_chunks222 ):223 # make sure the chunk that out of scope224 # is in the input subtask's results,225 # or the meta may be lost226 inp_subtask.chunk_graph.result_chunks.append(227 copied_out_of_scope_chunk228 )229 inp_subtasks.append(inp_subtask)230 depth = max(st.priority[0] for st in inp_subtasks) + 1231 else:232 inp_subtasks = []233 depth = 0234 priority = (depth, chunk_priority or 0)235 subtask = Subtask(236 subtask_id=new_task_id(),237 stage_id=self._stage_id,238 logic_key=self._gen_logic_key(chunks),239 session_id=self._task.session_id,240 task_id=self._task.task_id,241 chunk_graph=chunk_graph,242 expect_bands=expect_bands,243 bands_specified=bands_specified,244 virtual=is_virtual,245 priority=priority,246 retryable=retryable,247 extra_config=self._extra_config,248 )249 return subtask, inp_subtasks250 def _gen_logic_key(self, chunks: List[ChunkType]):251 return tokenize(252 *[self._logic_key_generator.get_logic_key(chunk.op) for chunk in chunks]253 )254 @enter_mode(build=True)255 def gen_subtask_graph(self) -> SubtaskGraph:256 """257 Analyze chunk graph and generate subtask graph.258 Returns259 -------260 subtask_graph: SubtaskGraph261 Subtask graph.262 """263 reassign_worker_ops = [264 chunk.op for chunk in self._chunk_graph if chunk.op.reassign_worker265 ]266 start_ops = (267 list(self._iter_start_ops(self._chunk_graph))268 if len(self._chunk_graph) > 0269 else []270 )271 # assign start chunks272 to_assign_ops = start_ops + reassign_worker_ops273 assigner = self._graph_assigner_cls(274 self._chunk_graph, to_assign_ops, self._band_resource275 )276 # assign expect workers277 cur_assigns = {278 op.key: self._to_band(op.expect_worker)279 for op in start_ops280 if op.expect_worker is not None281 }282 logger.debug(283 "Start to assign %s start chunks for task %s",284 len(start_ops),285 self._task.task_id,286 )287 chunk_to_bands = assigner.assign(cur_assigns=cur_assigns)288 logger.debug(289 "Assigned %s start chunks for task %s", len(start_ops), self._task.task_id290 )291 # assign expect workers for those specified with `expect_worker`292 # skip `start_ops`, which have been assigned before293 for chunk in self._chunk_graph:294 if chunk not in start_ops and chunk.op.expect_worker is not None:295 chunk_to_bands[chunk] = self._to_band(chunk.op.expect_worker)296 # color nodes297 if self._fuse_enabled:298 logger.debug("Start to fuse chunks for task %s", self._task.task_id)299 # sort start chunks in coloring as start_ops300 op_key_to_chunks = defaultdict(list)301 for chunk in self._chunk_graph:302 op_key_to_chunks[chunk.op.key].append(chunk)303 init_chunk_to_bands = dict()304 for start_op in start_ops:305 for start_chunk in op_key_to_chunks[start_op.key]:306 init_chunk_to_bands[start_chunk] = chunk_to_bands[start_chunk]307 coloring = Coloring(308 self._chunk_graph,309 list(self._band_resource),310 init_chunk_to_bands,311 initial_same_color_num=getattr(312 self._config, "initial_same_color_num", None313 ),314 as_broadcaster_successor_num=getattr(315 self._config, "as_broadcaster_successor_num", None316 ),317 )318 chunk_to_colors = coloring.color()319 else:320 # if not fuse enabled, color all chunks with different colors321 chunk_to_colors = {322 c: i for i, c in enumerate(self._chunk_graph.topological_iter())323 }324 color_to_chunks = defaultdict(list)325 for chunk, color in chunk_to_colors.items():326 color_to_chunks[color].append(chunk)327 # gen subtask graph328 subtask_graph = SubtaskGraph()329 chunk_to_fetch_chunk = dict()330 chunk_to_subtask = dict()331 # states332 visited = set()333 logic_key_to_subtasks = defaultdict(list)334 for chunk in self._chunk_graph.topological_iter():335 if chunk in visited:336 continue337 color = chunk_to_colors[chunk]338 same_color_chunks = color_to_chunks[color]339 if all(isinstance(c.op, Fetch) for c in same_color_chunks):340 # all fetch ops, no need to gen subtask341 continue342 subtask, inp_subtasks = self._gen_subtask_info(343 same_color_chunks,344 chunk_to_subtask,345 chunk_to_bands,346 chunk_to_fetch_chunk,347 )348 subtask_graph.add_node(subtask)349 logic_key_to_subtasks[subtask.logic_key].append(subtask)350 for inp_subtask in inp_subtasks:351 subtask_graph.add_edge(inp_subtask, subtask)352 for c in same_color_chunks:353 chunk_to_subtask[c] = subtask354 visited.update(same_color_chunks)355 for subtasks in logic_key_to_subtasks.values():356 for logic_index, subtask in enumerate(subtasks):357 subtask.logic_index = logic_index358 subtask.logic_parallelism = len(subtasks)...
builder.py
Source:builder.py
1# Copyright 1999-2021 Alibaba Group Holding Ltd.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import itertools15import logging16import os17import pickle # nosec # pylint: disable=import_pickle18import tempfile19import uuid20import numpy as np21from .... import opcodes22from .... import tensor as mt23from ....lib.filesystem import get_fs24from ....core import OutputType25from ....core.context import get_context26from ....core.operand import OperandStage27from ....serialization.serializables import (28 StringField,29 Int32Field,30 Int64Field,31 DictField,32 BytesField,33 TupleField,34 DataTypeField,35)36from ....utils import has_unknown_shape, Timer37from ...operands import LearnOperand, LearnOperandMixin38from ..core import (39 proxima,40 get_proxima_type,41 validate_tensor,42 available_numpy_dtypes,43 rechunk_tensor,44 build_mmap_chunks,45)46logger = logging.getLogger(__name__)47DEFAULT_INDEX_SIZE = 5 * 10**648class ProximaBuilder(LearnOperand, LearnOperandMixin):49 _op_type_ = opcodes.PROXIMA_SIMPLE_BUILDER50 _distance_metric = StringField("distance_metric")51 _dimension = Int32Field("dimension")52 _column_number = Int64Field("column_number")53 _index_path = StringField("index_path")54 _index_builder = StringField("index_builder")55 _index_builder_params = DictField("index_builder_params")56 _index_converter = StringField("index_converter")57 _index_converter_params = DictField("index_converter_params")58 _topk = Int32Field("topk")59 _storage_options = BytesField(60 "storage_options", on_serialize=pickle.dumps, on_deserialize=pickle.loads61 )62 # only for chunk63 _array_shape = TupleField("array_shape")64 _array_dtype = DataTypeField("array_dtype")65 _offset = Int64Field("offset")66 def __init__(67 self,68 distance_metric=None,69 index_path=None,70 dimension=None,71 column_number=None,72 index_builder=None,73 index_builder_params=None,74 index_converter=None,75 index_converter_params=None,76 array_shape=None,77 array_dtype=None,78 offset=None,79 topk=None,80 storage_options=None,81 output_types=None,82 **kw,83 ):84 super().__init__(85 _distance_metric=distance_metric,86 _index_path=index_path,87 _dimension=dimension,88 _column_number=column_number,89 _index_builder=index_builder,90 _index_builder_params=index_builder_params,91 _array_shape=array_shape,92 _array_dtype=array_dtype,93 _offset=offset,94 _index_converter=index_converter,95 _index_converter_params=index_converter_params,96 _topk=topk,97 _storage_options=storage_options,98 _output_types=output_types,99 **kw,100 )101 if self._output_types is None:102 self._output_types = [OutputType.object]103 @property104 def distance_metric(self):105 return self._distance_metric106 @property107 def column_number(self):108 return self._column_number109 @property110 def index_path(self):111 return self._index_path112 @property113 def dimension(self):114 return self._dimension115 @property116 def index_builder(self):117 return self._index_builder118 @property119 def index_builder_params(self):120 return self._index_builder_params121 @property122 def index_converter(self):123 return self._index_converter124 @property125 def index_converter_params(self):126 return self._index_converter_params127 @property128 def topk(self):129 return self._topk130 @property131 def storage_options(self):132 return self._storage_options133 @property134 def array_shape(self):135 return self._array_shape136 @property137 def array_dtype(self):138 return self._array_dtype139 @property140 def offset(self):141 return self._offset142 def __call__(self, tensor):143 return self.new_tileable([tensor])144 @classmethod145 def _get_atleast_topk_nsplit(cls, nsplit, topk):146 new_nsplit = []147 i = 0148 while i < len(nsplit):149 cur = nsplit[i]150 i += 1151 if cur >= topk:152 new_nsplit.append(cur)153 else:154 while i < len(nsplit):155 cur += nsplit[i]156 i += 1157 if cur >= topk:158 break159 if cur < topk and len(new_nsplit) > 0:160 new_nsplit[-1] += cur161 elif cur >= topk:162 new_nsplit.append(cur)163 new_nsplit = tuple(new_nsplit)164 assert sum(new_nsplit) == sum(nsplit), (165 f"sum of nsplit not equal, " f"old: {nsplit}, new: {new_nsplit}"166 )167 return new_nsplit168 @classmethod169 def tile(cls, op):170 tensor = op.inputs[0]171 out = op.outputs[0]172 index_path = op.index_path173 ctx = get_context()174 fs = None175 if index_path is not None:176 fs = get_fs(index_path, op.storage_options)177 if index_path is not None:178 # check if the index path is empty179 try:180 files = [f for f in fs.ls(index_path) if "proxima_" in f]181 if files:182 raise ValueError(183 f"Directory {index_path} contains built proxima index, "184 f"clean them to perform new index building"185 )186 except FileNotFoundError:187 # if not exist, create directory188 fs.mkdir(index_path)189 # make sure all inputs have known chunk sizes190 if has_unknown_shape(*op.inputs):191 yield192 if op.column_number:193 index_chunk_size = op.inputs[0].shape[0] // op.column_number194 else:195 worker_num = len(ctx.get_worker_addresses() or [])196 if worker_num > 0:197 index_chunk_size = max(198 op.inputs[0].shape[0] // worker_num, DEFAULT_INDEX_SIZE199 )200 else:201 index_chunk_size = DEFAULT_INDEX_SIZE202 if op.topk is not None:203 index_chunk_size = cls._get_atleast_topk_nsplit(index_chunk_size, op.topk)204 # build chunks for writing tensors to mmap files.205 worker_iter = iter(itertools.cycle(ctx.get_worker_addresses() or [None]))206 chunk_groups = rechunk_tensor(tensor, index_chunk_size)207 out_chunks = []208 offsets = []209 offset = 0210 for chunk_group in chunk_groups:211 offsets.append(offset)212 file_prefix = f"proxima-build-{str(uuid.uuid4())}"213 out_chunks.append(214 build_mmap_chunks(215 chunk_group, next(worker_iter), file_prefix=file_prefix216 )217 )218 offset += sum(c.shape[0] for c in chunk_group)219 final_out_chunks = []220 for j, chunks in enumerate(out_chunks):221 chunk_op = op.copy().reset_key()222 chunk_op.stage = OperandStage.map223 chunk_op.expect_worker = chunks[0].op.expect_worker224 chunk_op._array_shape = chunks[0].op.total_shape225 chunk_op._array_dtype = chunks[0].dtype226 chunk_op._offset = offsets[j]227 out_chunk = chunk_op.new_chunk(chunks, index=(j,))228 final_out_chunks.append(out_chunk)229 logger.warning(f"index chunks count: {len(final_out_chunks)} ")230 params = out.params231 params["chunks"] = final_out_chunks232 params["nsplits"] = ((1,) * len(final_out_chunks),)233 new_op = op.copy()234 return new_op.new_tileables(op.inputs, kws=[params])235 @classmethod236 def _execute_map(cls, ctx, op: "ProximaBuilder"):237 mmap_path = ctx[op.inputs[0].key]238 out = op.outputs[0]239 data = np.memmap(240 mmap_path, dtype=op.array_dtype, mode="r", shape=op.array_shape241 )242 proxima_type = get_proxima_type(op.array_dtype)243 offset = op.offset244 # holder245 with Timer() as timer:246 holder = proxima.IndexHolder(247 type=proxima_type, dimension=op.dimension, shallow=True248 )249 holder.mount(data, key_base=offset)250 logger.warning(f"Holder({op.key}) costs {timer.duration} seconds")251 # converter252 meta = proxima.IndexMeta(253 proxima_type, dimension=op.dimension, measure_name=op.distance_metric254 )255 if op.index_converter is not None:256 with Timer() as timer:257 converter = proxima.IndexConverter(258 name=op.index_converter, meta=meta, params=op.index_converter_params259 )260 converter.train_and_transform(holder)261 holder = converter.result()262 meta = converter.meta()263 logger.warning(f"Converter({op.key}) costs {timer.duration} seconds")264 # builder265 with Timer() as timer:266 builder = proxima.IndexBuilder(267 name=op.index_builder, meta=meta, params=op.index_builder_params268 )269 builder = builder.train_and_build(holder)270 logger.warning(f"Builder({op.key}) costs {timer.duration} seconds")271 # remove mmap file272 os.remove(mmap_path)273 # dumper274 with Timer() as timer:275 path = tempfile.mkstemp(prefix="proxima-", suffix=".index")[1]276 dumper = proxima.IndexDumper(name="FileDumper", path=path)277 builder.dump(dumper)278 dumper.close()279 logger.warning(f"Dumper({op.key}) costs {timer.duration} seconds")280 if op.index_path is None:281 ctx[out.key] = path282 else:283 # write to external file284 with Timer() as timer:285 fs = get_fs(op.index_path, op.storage_options)286 filename = f"proxima_{out.index[0]}_index"287 out_path = f'{op.index_path.rstrip("/")}/{filename}'288 def write_index():289 with fs.open(out_path, "wb") as out_f:290 with open(path, "rb") as in_f:291 # 128M292 chunk_bytes = 128 * 1024**2293 while True:294 data = in_f.read(chunk_bytes)295 if data:296 out_f.write(data)297 else:298 break299 # retry 3 times300 for _ in range(3):301 try:302 write_index()303 break304 except: # noqa: E722 # nosec # pylint: disable=bare-except305 fs.delete(out_path)306 continue307 logger.warning(308 f"WritingToVolume({op.key}), out path: {out_path}, "309 f"size {os.path.getsize(path)}, "310 f"costs {timer.duration} seconds "311 f"speed {round(os.path.getsize(path) / (1024 ** 2) / timer.duration, 2)} MB/s"312 )313 ctx[out.key] = filename314 @classmethod315 def _execute_agg(cls, ctx, op: "ProximaBuilder"):316 paths = [ctx[inp.key] for inp in op.inputs]317 ctx[op.outputs[0].key] = paths318 @classmethod319 def execute(cls, ctx, op: "ProximaBuilder"):320 if op.stage != OperandStage.agg:321 return cls._execute_map(ctx, op)322 else:323 return cls._execute_agg(ctx, op)324 @classmethod325 def concat_tileable_chunks(cls, tileable):326 assert not tileable.is_coarse()327 op = cls(stage=OperandStage.agg)328 chunk = cls(stage=OperandStage.agg).new_chunk(tileable.chunks)329 return op.new_tileable([tileable], chunks=[chunk], nsplits=((1,),))330def build_index(331 tensor,332 dimension=None,333 index_path=None,334 column_number=None,335 need_shuffle=False,336 distance_metric="SquaredEuclidean",337 index_builder="SsgBuilder",338 index_builder_params=None,339 index_converter=None,340 index_converter_params=None,341 topk=None,342 storage_options=None,343 run=True,344 session=None,345 run_kwargs=None,346):347 tensor = validate_tensor(tensor)348 if tensor.dtype not in available_numpy_dtypes:349 raise ValueError(350 f"Dtype to build index should be one of {available_numpy_dtypes}, "351 f"got {tensor.dtype}"352 )353 if dimension is None:354 dimension = tensor.shape[1]355 if index_builder_params is None:356 index_builder_params = {}357 if index_converter_params is None:358 index_converter_params = {}359 if need_shuffle:360 tensor = mt.random.permutation(tensor)361 op = ProximaBuilder(362 distance_metric=distance_metric,363 index_path=index_path,364 dimension=dimension,365 column_number=column_number,366 index_builder=index_builder,367 index_builder_params=index_builder_params,368 index_converter=index_converter,369 index_converter_params=index_converter_params,370 topk=topk,371 storage_options=storage_options,372 )373 result = op(tensor)374 if run:375 return result.execute(session=session, **(run_kwargs or dict()))376 else:...
test_worker.py
Source:test_worker.py
...15from asyncio.futures import Future16import pytest17from playwright.async_api import Error, Page, Worker18async def test_workers_page_workers(page, server):19 async with page.expect_worker() as worker_info:20 await page.goto(server.PREFIX + "/worker/worker.html")21 worker = await worker_info.value22 assert "worker.js" in worker.url23 assert (24 await worker.evaluate('() => self["workerFunction"]()')25 == "worker function result"26 )27 await page.goto(server.EMPTY_PAGE)28 assert len(page.workers) == 029async def test_workers_should_emit_created_and_destroyed_events(page: Page):30 worker_obj = None31 async with page.expect_event("worker") as event_info:32 worker_obj = await page.evaluate_handle(33 "() => new Worker(URL.createObjectURL(new Blob(['1'], {type: 'application/javascript'})))"34 )35 worker = await event_info.value36 worker_this_obj = await worker.evaluate_handle("() => this")37 worker_destroyed_promise: Future[Worker] = asyncio.Future()38 worker.once("close", lambda w: worker_destroyed_promise.set_result(w))39 await page.evaluate("workerObj => workerObj.terminate()", worker_obj)40 assert await worker_destroyed_promise == worker41 with pytest.raises(Error) as exc:42 await worker_this_obj.get_property("self")43 assert "Most likely the worker has been closed." in exc.value.message44async def test_workers_should_report_console_logs(page):45 async with page.expect_console_message() as message_info:46 await page.evaluate(47 '() => new Worker(URL.createObjectURL(new Blob(["console.log(1)"], {type: "application/javascript"})))'48 )49 message = await message_info.value50 assert message.text == "1"51@pytest.mark.skip_browser("firefox") # TODO: investigate further @pavelfeldman52async def test_workers_should_have_JSHandles_for_console_logs(page):53 log_promise = asyncio.Future()54 page.on("console", lambda m: log_promise.set_result(m))55 await page.evaluate(56 "() => new Worker(URL.createObjectURL(new Blob(['console.log(1,2,3,this)'], {type: 'application/javascript'})))"57 )58 log = await log_promise59 assert log.text == "1 2 3 JSHandle@object"60 assert len(log.args) == 461 assert await (await log.args[3].get_property("origin")).json_value() == "null"62async def test_workers_should_evaluate(page):63 async with page.expect_event("worker") as event_info:64 await page.evaluate(65 "() => new Worker(URL.createObjectURL(new Blob(['console.log(1)'], {type: 'application/javascript'})))"66 )67 worker = await event_info.value68 assert await worker.evaluate("1+1") == 269async def test_workers_should_report_errors(page):70 error_promise = asyncio.Future()71 page.on("pageerror", lambda e: error_promise.set_result(e))72 await page.evaluate(73 """() => new Worker(URL.createObjectURL(new Blob([`74 setTimeout(() => {75 // Do a console.log just to check that we do not confuse it with an error.76 console.log('hey');77 throw new Error('this is my error');78 })79 `], {type: 'application/javascript'})))"""80 )81 error_log = await error_promise82 assert "this is my error" in error_log.message83@pytest.mark.skip_browser("firefox") # TODO: fails upstream84async def test_workers_should_clear_upon_navigation(server, page):85 await page.goto(server.EMPTY_PAGE)86 async with page.expect_event("worker") as event_info:87 await page.evaluate(88 '() => new Worker(URL.createObjectURL(new Blob(["console.log(1)"], {type: "application/javascript"})))'89 )90 worker = await event_info.value91 assert len(page.workers) == 192 destroyed = []93 worker.once("close", lambda _: destroyed.append(True))94 await page.goto(server.PREFIX + "/one-style.html")95 assert destroyed == [True]96 assert len(page.workers) == 097@pytest.mark.skip_browser("firefox") # TODO: fails upstream98async def test_workers_should_clear_upon_cross_process_navigation(server, page):99 await page.goto(server.EMPTY_PAGE)100 async with page.expect_event("worker") as event_info:101 await page.evaluate(102 "() => new Worker(URL.createObjectURL(new Blob(['console.log(1)'], {type: 'application/javascript'})))"103 )104 worker = await event_info.value105 assert len(page.workers) == 1106 destroyed = []107 worker.once("close", lambda _: destroyed.append(True))108 await page.goto(server.CROSS_PROCESS_PREFIX + "/empty.html")109 assert destroyed == [True]110 assert len(page.workers) == 0111async def test_workers_should_report_network_activity(page, server):112 async with page.expect_worker() as worker_info:113 await page.goto(server.PREFIX + "/worker/worker.html"),114 worker = await worker_info.value115 url = server.PREFIX + "/one-style.css"116 async with page.expect_request(url) as request_info, page.expect_response(117 url118 ) as response_info:119 await worker.evaluate(120 "url => fetch(url).then(response => response.text()).then(console.log)", url121 )122 request = await request_info.value123 response = await response_info.value124 assert request.url == url125 assert response.request == request126 assert response.ok127async def test_workers_should_report_network_activity_on_worker_creation(page, server):128 # Chromium needs waitForDebugger enabled for this one.129 await page.goto(server.EMPTY_PAGE)130 url = server.PREFIX + "/one-style.css"131 async with page.expect_request(url) as request_info, page.expect_response(132 url133 ) as response_info:134 await page.evaluate(135 """url => new Worker(URL.createObjectURL(new Blob([`136 fetch("${url}").then(response => response.text()).then(console.log);137 `], {type: 'application/javascript'})))""",138 url,139 )140 request = await request_info.value141 response = await response_info.value142 assert request.url == url143 assert response.request == request144 assert response.ok145async def test_workers_should_format_number_using_context_locale(browser, server):146 context = await browser.new_context(locale="ru-RU")147 page = await context.new_page()148 await page.goto(server.EMPTY_PAGE)149 async with page.expect_worker() as worker_info:150 await page.evaluate(151 "() => new Worker(URL.createObjectURL(new Blob(['console.log(1)'], {type: 'application/javascript'})))"152 )153 worker = await worker_info.value154 assert await worker.evaluate("() => (10000.20).toLocaleString()") == "10\u00A0000,2"...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!