Best Python code snippet using slash
parallel.py
Source:parallel.py
1"""Tools to aid in parallelizing a function call.2Default method is MPI, if available. Fallback is concurrent.futures. If all3else fails, final fallback is serial.4Author: Seth Axen5Email: seth.axen@gmail.com6"""7import os8import sys9import logging10from copy import copy11import multiprocessing12try:13 from itertools import izip as zip14except ImportError: # python 315 pass16# upon import, figure out if MPI is available, and decide parallel_mode17MPI_MODE = "mpi"18FUTURES_THREADS_MODE = "threads"19FUTURES_PROCESSES_MODE = "processes"20SERIAL_MODE = "serial"21ALL_PARALLEL_MODES = (MPI_MODE,22 FUTURES_PROCESSES_MODE, FUTURES_THREADS_MODE,23 SERIAL_MODE)24available_parallel_modes = []25try:26 from mpi4py import MPI27 available_parallel_modes.append(MPI_MODE)28except ImportError:29 pass30try:31 import concurrent.futures32 available_parallel_modes.append(FUTURES_PROCESSES_MODE)33 available_parallel_modes.append(FUTURES_THREADS_MODE)34except ImportError:35 pass36available_parallel_modes.append(SERIAL_MODE)37def make_data_iterator(data_entries, *iterables):38 """Make an iterator from an iterable of data entries and constant values.39 All iterables should have the same number of entries. Any passed values40 that are not iterators, lists, or tuples will have that same value41 repeated for the entire length of `data_entries`.42 Parameters43 ----------44 data_entries : iterable45 Iterable of data entries.46 *iterables47 One or more iterables or constant values to serve as additional48 data entries. These are zipped into an iterator with `data_entries`.49 Returns50 -------51 iterator : Iterator of tuples, each with an item in `data_entries`52 followed by corresponding items in `iterables`.53 """54 from itertools import repeat, cycle55 from collections import Iterator56 new_iterables = [iter(data_entries), ]57 for iterable in iterables:58 if (isinstance(iterable, Iterator) or59 isinstance(iterable, list) or60 isinstance(iterable, tuple)):61 new_iterables.append(cycle(iterable))62 else:63 new_iterables.append(repeat(iterable))64 return zip(*new_iterables)65def read_bash_var(var, default=None):66 """Rad a bash variable for number of available processes/threads."""67 if var is None:68 return default69 try:70 val = int(os.environ[var])71 logging.debug("Variable %s indicates %d processors" % (var, val))72 return val73 except KeyError:74 logging.debug("Variable %s not set" % (var))75 return default76 except ValueError:77 logging.debug("Variable %s set to non-integer %s" % (var, str(val)))78 return default79def enum(*sequential, **named):80 """Fake an enumerated type.81 Reference:82 ----------83 - http://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python84 Parameters85 ----------86 *sequential87 List of items.88 **named89 Dictionary of items.90 """91 enums = dict(zip(sequential, range(len(sequential))), **named)92 return type('Enum', (), enums)93class Parallelizer(object):94 """A class to aid in parallelizing a function call.95 Ideal use case is when function calls are expected to have different96 runtimes and each call is completely independent of all others."""97 def __init__(self, parallel_mode=None, num_proc=None,98 num_proc_bash_var=None, fail_value=False):99 """Choose mode and/or number of processors or use defaults.100 Parameters101 ----------102 parallel_mode : str, optional (default None)103 Mode to use for parallelization. Available modes are104 ('mpi', 'processes', 'threads', 'serial').105 num_proc : int, optional (default None)106 Maximum number of processors to use. Ignored in MPI mode.107 num_proc_bash_var : str, optional (default None)108 Number of available processors will be read from bash variable.109 Ignored if `num_proc` is specified.110 fail_value : any, optional (default False)111 Result to be yielded if specific function evaluation failed.112 """113 preferred_parallel_modes = copy(available_parallel_modes)114 logging.debug("Parallel modes %s are available." %115 repr(preferred_parallel_modes))116 self.fail_value = fail_value117 self.rank = 0118 self.num_proc = None119 if parallel_mode is not None:120 if parallel_mode not in ALL_PARALLEL_MODES:121 raise KeyError("Parallel mode must be in %s." %122 (repr(ALL_PARALLEL_MODES)))123 if parallel_mode not in preferred_parallel_modes:124 if self.is_master():125 logging.warning(126 "Parallel mode %s not available. Will auto-select a replacement." % (repr(parallel_mode)))127 else:128 preferred_parallel_modes.pop(129 preferred_parallel_modes.index(parallel_mode))130 preferred_parallel_modes = ([parallel_mode, ] +131 preferred_parallel_modes)132 logging.debug("Available parallel modes reorganized to %s." % (133 repr(preferred_parallel_modes)))134 if num_proc is None:135 num_proc = read_bash_var(num_proc_bash_var)136 for parallel_mode in preferred_parallel_modes:137 logging.debug("Checking if mode %s is valid." %138 (repr(parallel_mode)))139 mode_num_proc = num_proc140 self.rank = 0141 if parallel_mode == MPI_MODE:142 comm = MPI.COMM_WORLD143 self.rank = comm.Get_rank()144 mpi_num_proc = comm.Get_size()145 mode_num_proc = mpi_num_proc146 if (mode_num_proc is not None and mode_num_proc < 2147 and parallel_mode != SERIAL_MODE):148 if self.is_master():149 logging.warning("Only %d processes available. %s mode not available." % (150 mode_num_proc, repr(parallel_mode)))151 continue152 elif (mode_num_proc is None153 and parallel_mode in (FUTURES_PROCESSES_MODE,154 FUTURES_THREADS_MODE)):155 mode_num_proc = multiprocessing.cpu_count()156 logging.info("num_proc is not specified. %s mode will use all %d processes" % (157 repr(parallel_mode), mode_num_proc))158 elif parallel_mode == SERIAL_MODE:159 mode_num_proc = 1160 self.parallel_mode = parallel_mode161 self.num_proc = mode_num_proc162 break163 if self.is_master():164 logging.info(165 "Parallelizer initialized with mode %s and %d processors." % (166 repr(self.parallel_mode), self.num_proc))167 def is_mpi(self):168 return self.parallel_mode == MPI_MODE169 def is_concurrent(self):170 return self.is_threads() or self.is_processes()171 def is_threads(self):172 return self.parallel_mode == FUTURES_THREADS_MODE173 def is_processes(self):174 return self.parallel_mode == FUTURES_PROCESSES_MODE175 def is_serial(self):176 return self.parallel_mode == SERIAL_MODE177 def is_master(self):178 return self.rank == 0179 def run(self, *args, **kwargs):180 r"""Execute a function in parallel. Return list of results.181 Parameters182 ----------183 func : function184 Function to execute. Argument is single entry of `data_iterator`185 as well as named arguments in `kwargs`.186 data_iterator : iterator187 Iterator where each entry is an argument to `func`. These data188 are communicated between processors so should be as small as189 possible.190 kwargs : dict, optional (default {})191 Named arguments for `func`.192 out_file : str, optional (default None)193 File to write results of function to. If None, results are yielded194 instead of being written to file.195 out_str : str, optional (default "%s\n")196 Format string to be written to output file for each result.197 out_format : function, optional (default str)198 Function to apply to output of `func` to format results to match199 `out_str`.200 logging_str : str, optional (default None)201 Format string to be logged using `logging` for each successful202 result. If None, only errors are logged.203 logging_format : function, optional (default str)204 Function to apply to `data` entries of `data_iterator` to format205 results to match `logging_str`.206 num_proc : int (default None)207 Number of processors to use. If None, maximum number available208 is used. If `is_mpi`, this term is ignored.209 Returns210 -------211 list : List of results of `func`.212 """213 results = [x for x in self.run_gen(*args, **kwargs)]214 return results215 def run_gen(self, func, data_iterator, kwargs={}, out_file=None,216 out_str="%s\n", out_format=str, logging_str=None,217 logging_format=str):218 r"""Execute a function in parallel. Return result iterator.219 Parameters220 ----------221 func : function222 Function to execute. Argument is single entry of `data_iterator`223 as well as named arguments in `kwargs`.224 data_iterator : iterator225 Iterator where each entry is an argument to `func`. These data226 are communicated between processors so should be as small as227 possible.228 kwargs : dict, optional (default {})229 Named arguments for `func`.230 out_file : str, optional (default None)231 File to write results of function to. If None, results are yielded232 instead of being written to file.233 out_str : str, optional (default "%s\n")234 Format string to be written to output file for each result.235 out_format : function, optional (default str)236 Function to apply to output of `func` to format results to match237 `out_str`.238 logging_str : str, optional (default None)239 Format string to be logged using `logging` for each successful240 result. If None, only errors are logged.241 logging_format : function, optional (default str)242 Function to apply to `data` entries of `data_iterator` to format243 results to match `logging_str`.244 num_proc : int (default None)245 Number of processors to use. If None, maximum number available246 is used. If `is_mpi`, this term is ignored.247 Returns248 -------249 iterator : Iterator through results of `func`.250 """251 result_iterator = iter([])252 if self.is_mpi():253 result_iterator = (x for x in self.mpi_run(254 func, data_iterator, kwargs=kwargs, out_file=out_file,255 out_str=out_str, out_format=out_format,256 logging_str=logging_str, logging_format=logging_format))257 elif self.is_concurrent():258 result_iterator = (x for x in self.concurrent_run(259 func, data_iterator, kwargs=kwargs, out_file=out_file,260 out_str=out_str, out_format=out_format,261 logging_str=logging_str, logging_format=logging_format))262 else:263 result_iterator = (x for x in self.serial_run(264 func, data_iterator, kwargs=kwargs, out_file=out_file,265 out_str=out_str, out_format=out_format,266 logging_str=logging_str, logging_format=logging_format))267 return result_iterator268 def serial_run(self, func, data_iterator, kwargs={}, out_file=None,269 out_str="%s\n", out_format=str, logging_str=None,270 logging_format=str):271 """Run in serial on a single processor."""272 if out_file is not None:273 fh = open(out_file, "w")274 for data in data_iterator:275 try:276 result = func(*data, **kwargs)277 if out_file is None:278 yield (result, data)279 else:280 fh.write(out_str % out_format(result))281 yield (True, data)282 if result != self.fail_value and logging_str is not None:283 logging.info(logging_str % logging_format(data))284 except:285 logging.error("Error running: %s" % str(data),286 exc_info=True)287 yield(self.fail_value, data)288 def concurrent_run(self, func, data_iterator, kwargs={}, out_file=None,289 out_str="%s\n", out_format=str, logging_str=None,290 logging_format=str):291 """Run in parallel with concurrent.futures."""292 if self.is_threads():293 executor = concurrent.futures.ThreadPoolExecutor(294 max_workers=self.num_proc)295 else:296 executor = concurrent.futures.ProcessPoolExecutor(297 max_workers=self.num_proc)298 jobs = []299 jobs_dict = {}300 for data in data_iterator:301 job = executor.submit(func, *data, **kwargs)302 jobs.append(job)303 jobs_dict[job] = data304 jobs_iterator = concurrent.futures.as_completed(jobs)305 if out_file is not None:306 fh = open(out_file, "w")307 for job in jobs_iterator:308 data = jobs_dict[job]309 try:310 result = job.result()311 if out_file is None:312 yield (result, data)313 else:314 fh.write(out_str % out_format(result))315 yield (True, data)316 if result != self.fail_value and logging_str is not None:317 logging.info(logging_str % logging_format(data))318 except KeyboardInterrupt:319 logging.error("Error running: %s" % str(data),320 exc_info=True)321 executor.shutdown()322 yield(self.fail_value, data)323 except:324 logging.error("Error running: %s" % str(data),325 exc_info=True)326 yield(self.fail_value, data)327 if out_file is not None:328 fh.close()329 executor.shutdown()330 def mpi_run(self, func, data_iterator, kwargs={}, out_file=None,331 out_str="%s\n", out_format=str, logging_str=None,332 logging_format=str):333 """Run in parallel with MPI.334 Reference:335 ----------336 - https://github.com/jbornschein/mpi4py-examples/blob/master/09-task-pull.py337 """338 comm = MPI.COMM_WORLD339 status = MPI.Status() # get MPI status object340 tags = enum('READY', 'DONE', 'EXIT', 'START')341 msg = "Proc:%d|" % self.rank342 comm.Barrier()343 mode = MPI.MODE_WRONLY | MPI.MODE_CREATE344 if out_file is not None:345 fh = MPI.File.Open(comm, out_file, mode)346 if self.is_master():347 task_index = 0348 num_workers = comm.Get_size() - 1349 closed_workers = 0350 logging.debug("%sMaster starting with %d workers" % (msg,351 num_workers))352 try:353 i = 0354 while closed_workers < num_workers:355 received = comm.recv(source=MPI.ANY_SOURCE,356 tag=MPI.ANY_TAG,357 status=status)358 source = status.Get_source()359 tag = status.Get_tag()360 if tag == tags.READY:361 try:362 data = next(data_iterator)363 except StopIteration:364 logging.debug(365 "%sEnd of data iterator. Closing proc %d" % (366 msg, source))367 comm.send(368 None, dest=source, tag=tags.EXIT)369 except:370 logging.debug("%sCould not get data" % msg)371 logging.debug(372 "%sSending task %d to proc %d" % (msg,373 task_index,374 source))375 comm.send(data, dest=source, tag=tags.START)376 task_index += 1377 elif tag == tags.DONE:378 if received is not None:379 result, data = received380 logging.debug(381 "%sReceived result %d from proc %d" % (382 msg, task_index, source))383 if (result != self.fail_value and384 logging_str is not None):385 logging.info(386 logging_str % logging_format(data))387 if out_file is None or result == self.fail_value:388 yield (result, data)389 else:390 yield (True, data)391 i += 1392 elif tag == tags.EXIT:393 logging.debug("%sExiting proc %d" % (msg, source))394 closed_workers += 1395 except (KeyboardInterrupt, SystemExit):396 logging.exception("%sError while processing" % msg,397 exc_info=True)398 sys.exit()399 else:400 # Worker processes execute code below401 while True:402 comm.send(None, dest=0, tag=tags.READY)403 data = comm.recv(404 source=0, tag=MPI.ANY_TAG, status=status)405 tag = status.Get_tag()406 if tag == tags.START:407 try:408 result = func(*data, **kwargs)409 if out_file is None:410 comm.send(411 (result, data), dest=0, tag=tags.DONE)412 else:413 fh.Write_shared(414 (out_str % out_format(result)).encode("utf-8"))415 comm.send(416 (True, data), dest=0, tag=tags.DONE)417 except:418 logging.error(419 "%sError running: %s" % (msg, str(data)),420 exc_info=True)421 comm.send(422 (self.fail_value, data), dest=0, tag=tags.DONE)423 elif tag == tags.EXIT:424 break425 comm.send(None, dest=0, tag=tags.EXIT)426 if out_file is not None:427 fh.Sync()428 fh.Close()429 comm.Barrier()430if __name__ == "__main__":431 def test_func(num, *args):432 return num * 100433 logging.basicConfig(level=logging.INFO, format="%(message)s")434 data_list = range(100)435 data_iterator = make_data_iterator(data_list, "test")436 parallelizer = Parallelizer(parallel_mode=FUTURES_PROCESSES_MODE)437 run_kwargs = {"out_file": "test_out.txt", "out_str": "%d\n",438 "out_format": lambda x: x,439 "logging_str": "Logged %s %d",440 "logging_format": lambda x: (x[1], x[0])}441 for result in parallelizer.run(test_func, data_iterator, **run_kwargs):...
universal.py
Source:universal.py
...3# import itertools4import util.logging5from .constants import MODES6CURRENT_MODE = MODES['scoop']7def max_parallel_mode():8 if util.parallel.is_running.scoop_module():9 max_parallel_mode = MODES['scoop']10 else:11 max_parallel_mode = MODES['multiprocessing']12 util.logging.debug('Maximal parallel mode is {}.'.format(max_parallel_mode))13 return max_parallel_mode14# map functions15# def map_serial_with_args(function, values, *args):16# util.logging.debug('Mapping function with {} args of types {} to values in serial.'.format(len(args), tuple(map(type, args))))17#18# values = args_generator_with_indices(values, args)19# results = itertools.starmap(function, values)20# results = tuple(results)21#...
sce.py
Source:sce.py
1import subprocess2import threading3import random4import time5import numpy as np6import multiprocessing as mp7from util_settings import *8class Complex:9 def __init__(self, compute_handles, controller):10 self.compute_handles = compute_handles11 self.controller = controller12 def update_compute_handles(self, compute_handles):13 self.compute_handles = compute_handles14 return(0)15 def evolve(self):16 for _ in range(self.controller.n_evolutions):17 all_args = np.array([ch.args for ch in self.compute_handles])18 h = np.array([[np.min(all_args[:,i]), np.max(all_args[:,i])] for i in range(self.controller.sample_space.shape[0])])19 evolution_sample = []20 while len(evolution_sample) < self.controller.n_evolution_sample:21 i = self.controller.generate_random()22 ch = self.compute_handles[i]23 if ch not in evolution_sample: evolution_sample.append([ch, i])24 for _ in range(self.controller.n_gen_offspring):25 evolution_sample.sort(key=lambda x : x[0].value)26 centroid = compute_centroid([ch[0] for ch in evolution_sample[:-1]])27 r = 2 * centroid - evolution_sample[-1][0].args28 if np.any(self.controller.sample_space[:,0] - r) > 0 or np.any(self.controller.sample_space[:,1] - r < 0):29 r = np.array([random.uniform(r[0], r[1]) for r in h])30 ch = ComputeHandle(r, self.controller)31 ch.compute()32 if ch.value < evolution_sample[-1][0].value: evolution_sample[-1][0] = ch33 else:34 r = (centroid + evolution_sample[-1][0].args) / 235 ch = ComputeHandle(r, self.controller)36 ch.compute()37 if ch.value < evolution_sample[-1][0].value: evolution_sample[-1][0] = ch38 else:39 r = np.array([random.uniform(r[0], r[1]) for r in h])40 ch = ComputeHandle(r, self.controller)41 ch.compute()42 evolution_sample[-1][0] = ch43 for (ch, i) in evolution_sample:44 self.compute_handles[i] = ch45 return(0)46class ComputeHandle:47 def __init__(self, args, controller):48 self.args = np.array(args)49 self.controller = controller50 self.value = False51 def compute(self):52 if self.controller.objf_mode == 'generic':53 str_args = [str(arg) for arg in self.args]54 if not self.value: self.value = float(subprocess.check_output([self.controller.function] + str_args))55 elif self.controller.objf_mode == 'python':56 if not self.value: self.value = self.controller.function(self.args)57 return(self.value)58class SCEController:59 def __init__(60 self,61 objf_mode,62 function,63 sample_space,64 parallel_mode=PARALLEL_MODE,65 n_complex=N_COMPLEX,66 n_points=N_POINTS,67 n_evolution_sample = N_EVOLUTION_SAMPLE,68 n_gen_offspring = N_GEN_OFFSPRING,69 n_evolutions = N_EVOLUTIONS,70 max_iters=MAX_ITERS,71 log_file = LOG_FILE72 ):73 74 # loading in input variables75 self.parallel_mode = parallel_mode76 self.objf_mode = objf_mode77 self.function = function78 self.sample_space = sample_space79 self.n_complex = n_complex80 self.n_points = n_points81 self.n_evolution_sample = n_evolution_sample82 self.n_gen_offspring = n_gen_offspring83 self.n_evolutions = n_evolutions84 self.max_iters = max_iters85 # configuring other variables86 self.best_value = float('inf')87 self.best_args = None88 self.prob = [2.0 * (self.n_points - i ) / (self.n_points * (self.n_points + 1)) for i in range(self.n_points)]89 for i in range(1, self.n_points): self.prob[i] += self.prob[i - 1]90 self.iters = 091 self.log_file = open(log_file, 'w')92 self.log = []93 self.add_log('SCE controller initialized')94 # starting SCE algorithm95 self.main_loop()96 def main_loop(self):97 self.init_complexes()98 while self.iters < self.max_iters:99 self.evolve_complexes()100 self.shuffle_complexes()101 for c in self.complexes:102 if c.compute_handles[0].value < self.best_value:103 self.best_value = c.compute_handles[0].value104 self.best_args = c.compute_handles[0].args105 return(0)106 107 def add_log(self, log_message):108 formatted_message = get_time_string() + 'CONTROLLER: ' + log_message109 self.log_file.write(formatted_message + '\n')110 self.log.append(formatted_message)111 return(0)112 def generate_random(self):113 r = random.random()114 for i in range(self.n_points):115 if r < self.prob[i]: return(i)116 def init_complexes(self):117 self.add_log('first initialization of complexes (iteration {0})...'.format(self.iters))118 compute_handles = [np.array([random.uniform(r[0], r[1]) for r in self.sample_space]) for _ in range(self.n_complex * self.n_points)]119 compute_handles = [ComputeHandle(args, self) for args in compute_handles]120 self.eval_compute_handles(compute_handles)121 compute_handles.sort(key=lambda x : x.value)122 self.complexes = []123 for i in range(self.n_complex):124 c = Complex([compute_handles[i + j * self.n_complex] for j in range(self.n_points)], self)125 self.complexes.append(c)126 self.add_log('complexes initialized (iteration {0})'.format(self.iters))127 self.iters += 1128 return(0)129 def shuffle_complexes(self):130 compute_handles = [ch for c in self.complexes for ch in c.compute_handles]131 compute_handles.sort(key=lambda x : x.value)132 133 for i in range(self.n_complex):134 self.complexes[i].update_compute_handles([compute_handles[i + j * self.n_complex] for j in range(self.n_points)])135 self.add_log('complexes shuffled (iteration {0})'.format(self.iters))136 self.iters += 1137 return(0)138 139 def evolve_complexes(self):140 if self.parallel_mode == 'serial':141 for c in self.complexes: c.evolve()142 elif self.parallel_mode == 'threaded':143 threads = []144 for c in self.complexes: threads.append(threading.Thread(target=c.evolve))145 for t in threads:146 t.start()147 t.join()148 149 else:150 for c in self.complexes: c.evolve()151 self.add_log('evolving complexes (iteration {0})...'.format(self.iters))152 self.add_log('complexes evolved (iteration {0})'.format(self.iters))153 return(0)154 def eval_compute_handles(self, compute_handles):155 if self.parallel_mode == 'serial':156 for ch in compute_handles: ch.compute()157 158 elif self.parallel_mode == 'threaded':159 threads = []160 for ch in compute_handles: threads.append(threading.Thread(target=ch.compute))161 for t in threads:162 t.start()163 t.join()164 else:165 pool = mp.Pool(self.parallel_mode)166 val = pool.map(parallel_compute_helper, compute_handles)167 for chi in range(len(compute_handles)):168 compute_handles[chi].value = val[chi]...
test_build_parallel.py
Source:test_build_parallel.py
...41 assert [ ] == mock_atpbar.disable.call_args_list42 ## workingarea43 assert parallel.workingarea is None44##__________________________________________________________________||45def test_build_logging_unknown_parallel_mode(caplog):46 with caplog.at_level(logging.WARNING):47 parallel = build_parallel(parallel_mode='unknown_mode')48 assert len(caplog.records) == 149 assert caplog.records[0].levelname == 'WARNING'50 assert 'parallel.build' in caplog.records[0].name51 assert 'unknown parallel_mode' in caplog.records[0].msg52 assert 'MultiprocessingDropbox' == parallel.communicationChannel.dropbox.__class__.__name__53##__________________________________________________________________||54@pytest.mark.parametrize('dispatcher_options', [dict()])55@pytest.mark.parametrize('user_modules', [[], ['scribblers']])56def test_build_parallel_subprocess(user_modules, dispatcher_options):57 parallel_mode = 'subprocess'58 parallel = build_parallel(59 parallel_mode=parallel_mode,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!