Best Python code snippet using pandera_python
combiners.py
Source:combiners.py
1#2# Licensed to the Apache Software Foundation (ASF) under one or more3# contributor license agreements. See the NOTICE file distributed with4# this work for additional information regarding copyright ownership.5# The ASF licenses this file to You under the Apache License, Version 2.06# (the "License"); you may not use this file except in compliance with7# the License. You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17"""A library of basic combiner PTransform subclasses."""18# pytype: skip-file19from __future__ import absolute_import20from __future__ import division21import heapq22import operator23import random24import sys25import warnings26from builtins import object27from builtins import zip28from typing import Any29from typing import Dict30from typing import Iterable31from typing import List32from typing import Set33from typing import Tuple34from typing import TypeVar35from typing import Union36from past.builtins import long37from apache_beam import typehints38from apache_beam.transforms import core39from apache_beam.transforms import cy_combiners40from apache_beam.transforms import ptransform41from apache_beam.transforms import window42from apache_beam.transforms.display import DisplayDataItem43from apache_beam.typehints import with_input_types44from apache_beam.typehints import with_output_types45from apache_beam.utils.timestamp import Duration46from apache_beam.utils.timestamp import Timestamp47__all__ = [48 'Count', 'Mean', 'Sample', 'Top', 'ToDict', 'ToList', 'ToSet', 'Latest'49]50# Type variables51T = TypeVar('T')52K = TypeVar('K')53V = TypeVar('V')54TimestampType = Union[int, float, Timestamp, Duration]55class Mean(object):56 """Combiners for computing arithmetic means of elements."""57 class Globally(ptransform.PTransform):58 """combiners.Mean.Globally computes the arithmetic mean of the elements."""59 def expand(self, pcoll):60 return pcoll | core.CombineGlobally(MeanCombineFn())61 class PerKey(ptransform.PTransform):62 """combiners.Mean.PerKey finds the means of the values for each key."""63 def expand(self, pcoll):64 return pcoll | core.CombinePerKey(MeanCombineFn())65# TODO(laolu): This type signature is overly restrictive. This should be66# more general.67@with_input_types(Union[float, int, long])68@with_output_types(float)69class MeanCombineFn(core.CombineFn):70 """CombineFn for computing an arithmetic mean."""71 def create_accumulator(self):72 return (0, 0)73 def add_input(self, sum_count, element):74 (sum_, count) = sum_count75 return sum_ + element, count + 176 def merge_accumulators(self, accumulators):77 sums, counts = zip(*accumulators)78 return sum(sums), sum(counts)79 def extract_output(self, sum_count):80 (sum_, count) = sum_count81 if count == 0:82 return float('NaN')83 return sum_ / float(count)84 def for_input_type(self, input_type):85 if input_type is int:86 return cy_combiners.MeanInt64Fn()87 elif input_type is float:88 return cy_combiners.MeanFloatFn()89 return self90class Count(object):91 """Combiners for counting elements."""92 class Globally(ptransform.PTransform):93 """combiners.Count.Globally counts the total number of elements."""94 def expand(self, pcoll):95 return pcoll | core.CombineGlobally(CountCombineFn())96 class PerKey(ptransform.PTransform):97 """combiners.Count.PerKey counts how many elements each unique key has."""98 def expand(self, pcoll):99 return pcoll | core.CombinePerKey(CountCombineFn())100 class PerElement(ptransform.PTransform):101 """combiners.Count.PerElement counts how many times each element occurs."""102 def expand(self, pcoll):103 paired_with_void_type = typehints.Tuple[pcoll.element_type, Any]104 output_type = typehints.KV[pcoll.element_type, int]105 return (106 pcoll107 | (108 '%s:PairWithVoid' % self.label >> core.Map(109 lambda x: (x, None)).with_output_types(paired_with_void_type))110 | core.CombinePerKey(CountCombineFn()).with_output_types(output_type))111@with_input_types(Any)112@with_output_types(int)113class CountCombineFn(core.CombineFn):114 """CombineFn for computing PCollection size."""115 def create_accumulator(self):116 return 0117 def add_input(self, accumulator, element):118 return accumulator + 1119 def add_inputs(self, accumulator, elements):120 return accumulator + len(list(elements))121 def merge_accumulators(self, accumulators):122 return sum(accumulators)123 def extract_output(self, accumulator):124 return accumulator125class Top(object):126 """Combiners for obtaining extremal elements."""127 # pylint: disable=no-self-argument128 class Of(ptransform.PTransform):129 """Obtain a list of the compare-most N elements in a PCollection.130 This transform will retrieve the n greatest elements in the PCollection131 to which it is applied, where "greatest" is determined by the comparator132 function supplied as the compare argument.133 """134 def _py2__init__(self, n, compare=None, *args, **kwargs):135 """Initializer.136 compare should be an implementation of "a < b" taking at least two137 arguments (a and b). Additional arguments and side inputs specified in138 the apply call become additional arguments to the comparator. Defaults to139 the natural ordering of the elements.140 The arguments 'key' and 'reverse' may instead be passed as keyword141 arguments, and have the same meaning as for Python's sort functions.142 Args:143 pcoll: PCollection to process.144 n: number of elements to extract from pcoll.145 compare: as described above.146 *args: as described above.147 **kwargs: as described above.148 """149 if compare:150 warnings.warn(151 'Compare not available in Python 3, use key instead.',152 DeprecationWarning)153 self._n = n154 self._compare = compare155 self._key = kwargs.pop('key', None)156 self._reverse = kwargs.pop('reverse', False)157 self._args = args158 self._kwargs = kwargs159 def _py3__init__(self, n, **kwargs):160 """Creates a global Top operation.161 The arguments 'key' and 'reverse' may be passed as keyword arguments,162 and have the same meaning as for Python's sort functions.163 Args:164 pcoll: PCollection to process.165 n: number of elements to extract from pcoll.166 **kwargs: may contain 'key' and/or 'reverse'167 """168 unknown_kwargs = set(kwargs.keys()) - set(['key', 'reverse'])169 if unknown_kwargs:170 raise ValueError(171 'Unknown keyword arguments: ' + ', '.join(unknown_kwargs))172 self._py2__init__(n, None, **kwargs)173 # Python 3 sort does not accept a comparison operator, and nor do we.174 # FIXME: mypy would handle this better if we placed the _py*__init__ funcs175 # inside the if/else block below:176 if sys.version_info[0] < 3:177 __init__ = _py2__init__178 else:179 __init__ = _py3__init__ # type: ignore180 def default_label(self):181 return 'Top(%d)' % self._n182 def expand(self, pcoll):183 compare = self._compare184 if (not self._args and not self._kwargs and pcoll.windowing.is_default()):185 if self._reverse:186 if compare is None or compare is operator.lt:187 compare = operator.gt188 else:189 original_compare = compare190 compare = lambda a, b: original_compare(b, a)191 # This is a more efficient global algorithm.192 top_per_bundle = pcoll | core.ParDo(193 _TopPerBundle(self._n, compare, self._key))194 # If pcoll is empty, we can't guerentee that top_per_bundle195 # won't be empty, so inject at least one empty accumulator196 # so that downstream is guerenteed to produce non-empty output.197 empty_bundle = pcoll.pipeline | core.Create([(None, [])])198 return ((top_per_bundle, empty_bundle) | core.Flatten()199 | core.GroupByKey()200 | core.ParDo(_MergeTopPerBundle(self._n, compare, self._key)))201 else:202 return pcoll | core.CombineGlobally(203 TopCombineFn(self._n, compare, self._key, self._reverse),204 *self._args,205 **self._kwargs)206 class PerKey(ptransform.PTransform):207 """Identifies the compare-most N elements associated with each key.208 This transform will produce a PCollection mapping unique keys in the input209 PCollection to the n greatest elements with which they are associated, where210 "greatest" is determined by the comparator function supplied as the compare211 argument in the initializer.212 """213 def _py2__init__(self, n, compare=None, *args, **kwargs):214 """Initializer.215 compare should be an implementation of "a < b" taking at least two216 arguments (a and b). Additional arguments and side inputs specified in217 the apply call become additional arguments to the comparator. Defaults to218 the natural ordering of the elements.219 The arguments 'key' and 'reverse' may instead be passed as keyword220 arguments, and have the same meaning as for Python's sort functions.221 Args:222 n: number of elements to extract from input.223 compare: as described above.224 *args: as described above.225 **kwargs: as described above.226 """227 if compare:228 warnings.warn(229 'Compare not available in Python 3, use key instead.',230 DeprecationWarning)231 self._n = n232 self._compare = compare233 self._key = kwargs.pop('key', None)234 self._reverse = kwargs.pop('reverse', False)235 self._args = args236 self._kwargs = kwargs237 def _py3__init__(self, n, **kwargs):238 """Creates a per-key Top operation.239 The arguments 'key' and 'reverse' may be passed as keyword arguments,240 and have the same meaning as for Python's sort functions.241 Args:242 pcoll: PCollection to process.243 n: number of elements to extract from pcoll.244 **kwargs: may contain 'key' and/or 'reverse'245 """246 unknown_kwargs = set(kwargs.keys()) - set(['key', 'reverse'])247 if unknown_kwargs:248 raise ValueError(249 'Unknown keyword arguments: ' + ', '.join(unknown_kwargs))250 self._py2__init__(n, None, **kwargs)251 # Python 3 sort does not accept a comparison operator, and nor do we.252 if sys.version_info[0] < 3:253 __init__ = _py2__init__254 else:255 __init__ = _py3__init__ # type: ignore256 def default_label(self):257 return 'TopPerKey(%d)' % self._n258 def expand(self, pcoll):259 """Expands the transform.260 Raises TypeCheckError: If the output type of the input PCollection is not261 compatible with Tuple[A, B].262 Args:263 pcoll: PCollection to process264 Returns:265 the PCollection containing the result.266 """267 return pcoll | core.CombinePerKey(268 TopCombineFn(self._n, self._compare, self._key, self._reverse),269 *self._args,270 **self._kwargs)271 @staticmethod272 @ptransform.ptransform_fn273 def Largest(pcoll, n):274 """Obtain a list of the greatest N elements in a PCollection."""275 return pcoll | Top.Of(n)276 @staticmethod277 @ptransform.ptransform_fn278 def Smallest(pcoll, n):279 """Obtain a list of the least N elements in a PCollection."""280 return pcoll | Top.Of(n, reverse=True)281 @staticmethod282 @ptransform.ptransform_fn283 def LargestPerKey(pcoll, n):284 """Identifies the N greatest elements associated with each key."""285 return pcoll | Top.PerKey(n)286 @staticmethod287 @ptransform.ptransform_fn288 def SmallestPerKey(pcoll, n, reverse=True):289 """Identifies the N least elements associated with each key."""290 return pcoll | Top.PerKey(n, reverse=True)291@with_input_types(T)292@with_output_types(Tuple[None, List[T]])293class _TopPerBundle(core.DoFn):294 def __init__(self, n, less_than, key):295 self._n = n296 self._less_than = None if less_than is operator.le else less_than297 self._key = key298 def start_bundle(self):299 self._heap = []300 def process(self, element):301 if self._less_than or self._key:302 element = cy_combiners.ComparableValue(303 element, self._less_than, self._key)304 if len(self._heap) < self._n:305 heapq.heappush(self._heap, element)306 else:307 heapq.heappushpop(self._heap, element)308 def finish_bundle(self):309 # Though sorting here results in more total work, this allows us to310 # skip most elements in the reducer.311 # Essentially, given s map bundles, we are trading about O(sn) compares in312 # the (single) reducer for O(sn log n) compares across all mappers.313 self._heap.sort()314 # Unwrap to avoid serialization via pickle.315 if self._less_than or self._key:316 yield window.GlobalWindows.windowed_value(317 (None, [wrapper.value for wrapper in self._heap]))318 else:319 yield window.GlobalWindows.windowed_value((None, self._heap))320@with_input_types(Tuple[None, Iterable[List[T]]])321@with_output_types(List[T])322class _MergeTopPerBundle(core.DoFn):323 def __init__(self, n, less_than, key):324 self._n = n325 self._less_than = None if less_than is operator.lt else less_than326 self._key = key327 def process(self, key_and_bundles):328 _, bundles = key_and_bundles329 def push(hp, e):330 if len(hp) < self._n:331 heapq.heappush(hp, e)332 return False333 elif e < hp[0]:334 # Because _TopPerBundle returns sorted lists, all other elements335 # will also be smaller.336 return True337 else:338 heapq.heappushpop(hp, e)339 return False340 if self._less_than or self._key:341 heapc = [] # type: List[cy_combiners.ComparableValue]342 for bundle in bundles:343 if not heapc:344 heapc = [345 cy_combiners.ComparableValue(element, self._less_than, self._key)346 for element in bundle347 ]348 continue349 for element in reversed(bundle):350 if push(heapc,351 cy_combiners.ComparableValue(element,352 self._less_than,353 self._key)):354 break355 heapc.sort()356 yield [wrapper.value for wrapper in reversed(heapc)]357 else:358 heap = []359 for bundle in bundles:360 if not heap:361 heap = bundle362 continue363 for element in reversed(bundle):364 if push(heap, element):365 break366 heap.sort()367 yield heap[::-1]368@with_input_types(T)369@with_output_types(List[T])370class TopCombineFn(core.CombineFn):371 """CombineFn doing the combining for all of the Top transforms.372 This CombineFn uses a key or comparison operator to rank the elements.373 Args:374 compare: (optional) an implementation of "a < b" taking at least two375 arguments (a and b). Additional arguments and side inputs specified376 in the apply call become additional arguments to the comparator.377 key: (optional) a mapping of elements to a comparable key, similar to378 the key argument of Python's sorting methods.379 reverse: (optional) whether to order things smallest to largest, rather380 than largest to smallest381 """382 # TODO(robertwb): For Python 3, remove compare and only keep key.383 def __init__(self, n, compare=None, key=None, reverse=False):384 self._n = n385 if compare is operator.lt:386 compare = None387 elif compare is operator.gt:388 compare = None389 reverse = not reverse390 if compare:391 self._compare = ((392 lambda a, b, *args, **kwargs: not compare(a, b, *args, **kwargs))393 if reverse else compare)394 else:395 self._compare = operator.gt if reverse else operator.lt396 self._less_than = None397 self._key = key398 def _hydrated_heap(self, heap):399 if heap:400 first = heap[0]401 if isinstance(first, cy_combiners.ComparableValue):402 if first.requires_hydration:403 assert self._less_than is not None404 for comparable in heap:405 assert comparable.requires_hydration406 comparable.hydrate(self._less_than, self._key)407 assert not comparable.requires_hydration408 return heap409 else:410 return heap411 else:412 assert self._less_than is not None413 return [414 cy_combiners.ComparableValue(element, self._less_than, self._key)415 for element in heap416 ]417 else:418 return heap419 def display_data(self):420 return {421 'n': self._n,422 'compare': DisplayDataItem(423 self._compare.__name__ if hasattr(self._compare, '__name__') else424 self._compare.__class__.__name__).drop_if_none()425 }426 # The accumulator type is a tuple427 # (bool, Union[List[T], List[ComparableValue[T]])428 # where the boolean indicates whether the second slot contains a List of T429 # (False) or List of ComparableValue[T] (True). In either case, the List430 # maintains heap invariance. When the contents of the List are431 # ComparableValue[T] they either all 'requires_hydration' or none do.432 # This accumulator representation allows us to minimize the data encoding433 # overheads. Creation of ComparableValues is elided for performance reasons434 # when there is no need for complicated comparison functions.435 def create_accumulator(self, *args, **kwargs):436 return (False, [])437 def add_input(self, accumulator, element, *args, **kwargs):438 # Caching to avoid paying the price of variadic expansion of args / kwargs439 # when it's not needed (for the 'if' case below).440 if self._less_than is None:441 if args or kwargs:442 self._less_than = lambda a, b: self._compare(a, b, *args, **kwargs)443 else:444 self._less_than = self._compare445 holds_comparables, heap = accumulator446 if self._less_than is not operator.lt or self._key:447 heap = self._hydrated_heap(heap)448 holds_comparables = True449 else:450 assert not holds_comparables451 comparable = (452 cy_combiners.ComparableValue(element, self._less_than, self._key)453 if holds_comparables else element)454 if len(heap) < self._n:455 heapq.heappush(heap, comparable)456 else:457 heapq.heappushpop(heap, comparable)458 return (holds_comparables, heap)459 def merge_accumulators(self, accumulators, *args, **kwargs):460 if args or kwargs:461 self._less_than = lambda a, b: self._compare(a, b, *args, **kwargs)462 add_input = lambda accumulator, element: self.add_input(463 accumulator, element, *args, **kwargs)464 else:465 self._less_than = self._compare466 add_input = self.add_input467 result_heap = None468 holds_comparables = None469 for accumulator in accumulators:470 holds_comparables, heap = accumulator471 if self._less_than is not operator.lt or self._key:472 heap = self._hydrated_heap(heap)473 holds_comparables = True474 else:475 assert not holds_comparables476 if result_heap is None:477 result_heap = heap478 else:479 for comparable in heap:480 _, result_heap = add_input(481 (holds_comparables, result_heap),482 comparable.value if holds_comparables else comparable)483 assert result_heap is not None and holds_comparables is not None484 return (holds_comparables, result_heap)485 def compact(self, accumulator, *args, **kwargs):486 holds_comparables, heap = accumulator487 # Unwrap to avoid serialization via pickle.488 if holds_comparables:489 return (False, [comparable.value for comparable in heap])490 else:491 return accumulator492 def extract_output(self, accumulator, *args, **kwargs):493 if args or kwargs:494 self._less_than = lambda a, b: self._compare(a, b, *args, **kwargs)495 else:496 self._less_than = self._compare497 holds_comparables, heap = accumulator498 if self._less_than is not operator.lt or self._key:499 if not holds_comparables:500 heap = self._hydrated_heap(heap)501 holds_comparables = True502 else:503 assert not holds_comparables504 assert len(heap) <= self._n505 heap.sort(reverse=True)506 return [507 comparable.value if holds_comparables else comparable508 for comparable in heap509 ]510class Largest(TopCombineFn):511 def default_label(self):512 return 'Largest(%s)' % self._n513class Smallest(TopCombineFn):514 def __init__(self, n):515 super(Smallest, self).__init__(n, reverse=True)516 def default_label(self):517 return 'Smallest(%s)' % self._n518class Sample(object):519 """Combiners for sampling n elements without replacement."""520 # pylint: disable=no-self-argument521 class FixedSizeGlobally(ptransform.PTransform):522 """Sample n elements from the input PCollection without replacement."""523 def __init__(self, n):524 self._n = n525 def expand(self, pcoll):526 return pcoll | core.CombineGlobally(SampleCombineFn(self._n))527 def display_data(self):528 return {'n': self._n}529 def default_label(self):530 return 'FixedSizeGlobally(%d)' % self._n531 class FixedSizePerKey(ptransform.PTransform):532 """Sample n elements associated with each key without replacement."""533 def __init__(self, n):534 self._n = n535 def expand(self, pcoll):536 return pcoll | core.CombinePerKey(SampleCombineFn(self._n))537 def display_data(self):538 return {'n': self._n}539 def default_label(self):540 return 'FixedSizePerKey(%d)' % self._n541@with_input_types(T)542@with_output_types(List[T])543class SampleCombineFn(core.CombineFn):544 """CombineFn for all Sample transforms."""545 def __init__(self, n):546 super(SampleCombineFn, self).__init__()547 # Most of this combiner's work is done by a TopCombineFn. We could just548 # subclass TopCombineFn to make this class, but since sampling is not549 # really a kind of Top operation, we use a TopCombineFn instance as a550 # helper instead.551 self._top_combiner = TopCombineFn(n)552 def create_accumulator(self):553 return self._top_combiner.create_accumulator()554 def add_input(self, heap, element):555 # Before passing elements to the Top combiner, we pair them with random556 # numbers. The elements with the n largest random number "keys" will be557 # selected for the output.558 return self._top_combiner.add_input(heap, (random.random(), element))559 def merge_accumulators(self, heaps):560 return self._top_combiner.merge_accumulators(heaps)561 def compact(self, heap):562 return self._top_combiner.compact(heap)563 def extract_output(self, heap):564 # Here we strip off the random number keys we added in add_input.565 return [e for _, e in self._top_combiner.extract_output(heap)]566class _TupleCombineFnBase(core.CombineFn):567 def __init__(self, *combiners):568 self._combiners = [core.CombineFn.maybe_from_callable(c) for c in combiners]569 self._named_combiners = combiners570 def display_data(self):571 combiners = [572 c.__name__ if hasattr(c, '__name__') else c.__class__.__name__573 for c in self._named_combiners574 ]575 return {'combiners': str(combiners)}576 def create_accumulator(self):577 return [c.create_accumulator() for c in self._combiners]578 def merge_accumulators(self, accumulators):579 return [580 c.merge_accumulators(a) for c,581 a in zip(self._combiners, zip(*accumulators))582 ]583 def compact(self, accumulator):584 return [c.compact(a) for c, a in zip(self._combiners, accumulator)]585 def extract_output(self, accumulator):586 return tuple(587 [c.extract_output(a) for c, a in zip(self._combiners, accumulator)])588class TupleCombineFn(_TupleCombineFnBase):589 """A combiner for combining tuples via a tuple of combiners.590 Takes as input a tuple of N CombineFns and combines N-tuples by591 combining the k-th element of each tuple with the k-th CombineFn,592 outputting a new N-tuple of combined values.593 """594 def add_input(self, accumulator, element):595 return [596 c.add_input(a, e) for c,597 a,598 e in zip(self._combiners, accumulator, element)599 ]600 def with_common_input(self):601 return SingleInputTupleCombineFn(*self._combiners)602class SingleInputTupleCombineFn(_TupleCombineFnBase):603 """A combiner for combining a single value via a tuple of combiners.604 Takes as input a tuple of N CombineFns and combines elements by605 applying each CombineFn to each input, producing an N-tuple of606 the outputs corresponding to each of the N CombineFn's outputs.607 """608 def add_input(self, accumulator, element):609 return [610 c.add_input(a, element) for c, a in zip(self._combiners, accumulator)611 ]612class ToList(ptransform.PTransform):613 """A global CombineFn that condenses a PCollection into a single list."""614 def __init__(self, label='ToList'): # pylint: disable=useless-super-delegation615 super(ToList, self).__init__(label)616 def expand(self, pcoll):617 return pcoll | self.label >> core.CombineGlobally(ToListCombineFn())618@with_input_types(T)619@with_output_types(List[T])620class ToListCombineFn(core.CombineFn):621 """CombineFn for to_list."""622 def create_accumulator(self):623 return []624 def add_input(self, accumulator, element):625 accumulator.append(element)626 return accumulator627 def merge_accumulators(self, accumulators):628 return sum(accumulators, [])629 def extract_output(self, accumulator):630 return accumulator631class ToDict(ptransform.PTransform):632 """A global CombineFn that condenses a PCollection into a single dict.633 PCollections should consist of 2-tuples, notionally (key, value) pairs.634 If multiple values are associated with the same key, only one of the values635 will be present in the resulting dict.636 """637 def __init__(self, label='ToDict'): # pylint: disable=useless-super-delegation638 super(ToDict, self).__init__(label)639 def expand(self, pcoll):640 return pcoll | self.label >> core.CombineGlobally(ToDictCombineFn())641@with_input_types(Tuple[K, V])642@with_output_types(Dict[K, V])643class ToDictCombineFn(core.CombineFn):644 """CombineFn for to_dict."""645 def create_accumulator(self):646 return dict()647 def add_input(self, accumulator, element):648 key, value = element649 accumulator[key] = value650 return accumulator651 def merge_accumulators(self, accumulators):652 result = dict()653 for a in accumulators:654 result.update(a)655 return result656 def extract_output(self, accumulator):657 return accumulator658class ToSet(ptransform.PTransform):659 """A global CombineFn that condenses a PCollection into a set."""660 def __init__(self, label='ToSet'): # pylint: disable=useless-super-delegation661 super(ToSet, self).__init__(label)662 def expand(self, pcoll):663 return pcoll | self.label >> core.CombineGlobally(ToSetCombineFn())664@with_input_types(T)665@with_output_types(Set[T])666class ToSetCombineFn(core.CombineFn):667 """CombineFn for ToSet."""668 def create_accumulator(self):669 return set()670 def add_input(self, accumulator, element):671 accumulator.add(element)672 return accumulator673 def merge_accumulators(self, accumulators):674 return set.union(*accumulators)675 def extract_output(self, accumulator):676 return accumulator677class _CurriedFn(core.CombineFn):678 """Wrapped CombineFn with extra arguments."""679 def __init__(self, fn, args, kwargs):680 self.fn = fn681 self.args = args682 self.kwargs = kwargs683 def create_accumulator(self):684 return self.fn.create_accumulator(*self.args, **self.kwargs)685 def add_input(self, accumulator, element):686 return self.fn.add_input(accumulator, element, *self.args, **self.kwargs)687 def merge_accumulators(self, accumulators):688 return self.fn.merge_accumulators(accumulators, *self.args, **self.kwargs)689 def compact(self, accumulator):690 return self.fn.compact(accumulator, *self.args, **self.kwargs)691 def extract_output(self, accumulator):692 return self.fn.extract_output(accumulator, *self.args, **self.kwargs)693 def apply(self, elements):694 return self.fn.apply(elements, *self.args, **self.kwargs)695def curry_combine_fn(fn, args, kwargs):696 if not args and not kwargs:697 return fn698 else:699 return _CurriedFn(fn, args, kwargs)700class PhasedCombineFnExecutor(object):701 """Executor for phases of combine operations."""702 def __init__(self, phase, fn, args, kwargs):703 self.combine_fn = curry_combine_fn(fn, args, kwargs)704 if phase == 'all':705 self.apply = self.full_combine706 elif phase == 'add':707 self.apply = self.add_only708 elif phase == 'merge':709 self.apply = self.merge_only710 elif phase == 'extract':711 self.apply = self.extract_only712 else:713 raise ValueError('Unexpected phase: %s' % phase)714 def full_combine(self, elements):715 return self.combine_fn.apply(elements)716 def add_only(self, elements):717 return self.combine_fn.add_inputs(718 self.combine_fn.create_accumulator(), elements)719 def merge_only(self, accumulators):720 return self.combine_fn.merge_accumulators(accumulators)721 def extract_only(self, accumulator):722 return self.combine_fn.extract_output(accumulator)723class Latest(object):724 """Combiners for computing the latest element"""725 @with_input_types(T)726 @with_output_types(T)727 class Globally(ptransform.PTransform):728 """Compute the element with the latest timestamp from a729 PCollection."""730 @staticmethod731 def add_timestamp(element, timestamp=core.DoFn.TimestampParam):732 return [(element, timestamp)]733 def expand(self, pcoll):734 return (735 pcoll736 | core.ParDo(self.add_timestamp).with_output_types(737 Tuple[T, TimestampType]) # type: ignore[misc]738 | core.CombineGlobally(LatestCombineFn()))739 @with_input_types(Tuple[K, V])740 @with_output_types(Tuple[K, V])741 class PerKey(ptransform.PTransform):742 """Compute elements with the latest timestamp for each key743 from a keyed PCollection"""744 @staticmethod745 def add_timestamp(element, timestamp=core.DoFn.TimestampParam):746 key, value = element747 return [(key, (value, timestamp))]748 def expand(self, pcoll):749 return (750 pcoll751 | core.ParDo(self.add_timestamp).with_output_types(752 Tuple[K, Tuple[T, TimestampType]]) # type: ignore[misc]753 | core.CombinePerKey(LatestCombineFn()))754@with_input_types(Tuple[T, TimestampType]) # type: ignore[misc]755@with_output_types(T)756class LatestCombineFn(core.CombineFn):757 """CombineFn to get the element with the latest timestamp758 from a PCollection."""759 def create_accumulator(self):760 return (None, window.MIN_TIMESTAMP)761 def add_input(self, accumulator, element):762 if accumulator[1] > element[1]:763 return accumulator764 else:765 return element766 def merge_accumulators(self, accumulators):767 result = self.create_accumulator()768 for accumulator in accumulators:769 result = self.add_input(result, accumulator)770 return result771 def extract_output(self, accumulator):...
container.py
Source:container.py
...41 @type _queue: List42 The end of the list represents the *front* of the queue, that is,43 the next item to be removed.44 @type _less_than: Callable[[Object, Object], bool]45 If x._less_than(y) is true, then x has higher priority than y46 and should be removed from the queue before y.47 === Representation Invariants ===48 - all elements of _queue are of the same type49 - the elements of _queue are appropriate arguments for function less_than50 - the elements of _queue are in order according to function less_than.51 """52 def __init__(self, less_than):53 """Initialize this to an empty PriorityQueue.54 @type self: PriorityQueue55 @type less_than: Callable[[Object, Object], bool]56 Determines the relative priority of two elements of the queue.57 If x._less_than(y) is true, then x has higher priority than y.58 @rtype: None59 """60 self._queue = []61 self._less_than = less_than62 def add(self, item):63 """Add <item> to this PriorityQueue.64 @type self: PriorityQueue65 @type item: Object66 @rtype: None67 >>> def shorter(a, b):68 ... return len(a) < len(b)69 ...70 >>>71 >>> # Define a PriorityQueue with priority on shorter strings.72 >>> # I.e., when we remove, we get the shortest remaining string.73 >>> pq = PriorityQueue(shorter)74 >>> pq.add('fred')75 >>> pq.add('arju')76 >>> pq.add('monalisa')77 >>> pq.add('hat')78 >>> pq._queue79 ['monalisa', 'arju', 'fred', 'hat']80 >>> pq.remove()81 'hat'82 >>> pq._queue83 ['monalisa', 'arju', 'fred']84 """85 if len(self._queue) == 0:86 self._queue.append(item)87 else:88 if self._less_than(item, self._queue[-1]):89 self._queue.append(item)90 elif self._less_than(self._queue[0], item):91 self._queue = [item] + self._queue92 else:93 for itr in self._queue:94 if not self._less_than(item, itr):95 self._queue.insert(self._queue.index(itr), item)96 break97 def remove(self):98 """Remove and return the next item from this PriorityQueue.99 Precondition: this priority queue is non-empty.100 @type self: PriorityQueue101 @rtype: Object102 >>> def shorter(a, b):103 ... return len(a) < len(b)104 ...105 >>>106 >>> # When we hit the tie, the one that was added first will be107 >>> # removed first.108 >>> pq = PriorityQueue(shorter)...
leetcode-973.py
Source:leetcode-973.py
...9 # å°æ°ç»çå k 个å
ç´ å å10 for idx in range(k // 2, -1, -1):11 self.adjust(points, idx, k - 1)12 for idx in range(k, len(points)):13 if self._less_than(points[0], points[idx]):14 continue15 points[0], points[idx] = points[idx], points[0]16 self.adjust(points, 0, k - 1)17 return points[:k]18 def _less_than(self, point1: List[int], point2: List[int]) -> bool: # noqa19 return (point1[0] * point1[0] + point1[1] * point1[1]) < (point2[0] * point2[0] + point2[1] * point2[1])20 # å¤§æ ¹å 21 def adjust(self, arr: List[List[int]], start: int, end: int) -> None: # noqa22 if start >= end:23 return24 while 2 * start + 1 <= end:25 left: int = 2 * start + 126 right: int = 2 * start + 227 # å¦æ没æå³å©å28 if right > end:29 if self._less_than(arr[start], arr[left]):30 arr[start], arr[left] = arr[left], arr[start]31 break32 if not self._less_than(arr[start], arr[left]) and not self._less_than(arr[start], arr[right]):33 break34 if not self._less_than(arr[left], arr[right]):35 # ä¸å·¦å©å交æ¢36 arr[start], arr[left] = arr[left], arr[start]37 start = left38 continue39 arr[start], arr[right] = arr[right], arr[start]...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!