Best Python code snippet using locust
main_contr_evo.py
Source:main_contr_evo.py
1#! /usr/bin/env python32import argparse3import json4import math5import multiprocessing6import neat7import os8import random9import re10import subprocess11import time12import shutil13from itertools import count14from tqdm import tqdm15from utils import print_header_contr_evo16from goal_reaching.controller import Controller17from goal_reaching.robot_gr import RobotGR18from goal_reaching.statistics_reporter import StatisticsReporterGR19from goal_reaching.utils_gr import open_history_file, restore_history_file, open_evo_file, restore_evo_file, \20 restore_best_controller, log_gen_info, update_history21'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''22 Global variables needed to interact with the NEAT library23'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''24# Checkpoint directory25checkpoint_dir = None26# Controllers directory27controllers_dir = None28# Results directory29results_dir = None30# History file31history_file = None32# Controllers evolution file33contr_evo_file = None34# Robot configuration35robot_config = None36# Neat population37p = None38# Neat seed39seed = None40# Checkpointer instance41checkpointer = None42# Checkpoint usage43checkpoint_used = None44# Stats reporter instance45stats_reporter = None46# Simulations number control variables47num_sims = None48max_num_sims = None49sim_end = None50# Multiprocessing pool51pool = None52# Progress bar53pbar = None54# Start time55start_time = None56'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''57'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''58# Evaluates the controllers provided59def eval_controllers(genomes, config):60 global num_sims, sim_end61 if num_sims < max_num_sims:62 # extract indices of controllers to evaluate and populate pool input63 controllers_indices = []64 pool_input = []65 for i in range(0, len(genomes)):66 if genomes[i][1].fitness is None:67 controller = Controller(genomes[i][0], seed, genomes[i][1], config)68 controller_path = controller.save(controllers_dir)69 if controller_path is not None:70 controllers_indices.append(i)71 pool_input.append([72 controller_path,73 random.randint(0, 1000000)74 ])75 else:76 genomes[i][1].fitness = 0.077 # evaluate controllers78 fitnesses = pool.map(eval_controller, pool_input)79 num_evals = len(fitnesses)80 for i in range(0, num_evals):81 genomes[controllers_indices[i]][1].fitness = fitnesses[i]82 pbar.update(min(num_evals, max_num_sims - num_sims))83 num_sims += num_evals84 # update history85 update_history(history_file, p, genomes, controllers_indices)86 # log generation info87 log_gen_info(contr_evo_file, num_sims, genomes, num_sims != num_evals)88 else:89 if not sim_end:90 end_simulation(p.best_genome)91 p.remove_reporter(checkpointer)92 p.remove_reporter(stats_reporter)93 sim_end = True94 for c_id, genome in genomes:95 if genome.fitness is None:96 genome.fitness = 0.097# returns the fitness of the controller provided98def eval_controller(inputs):99 controller_path = inputs[0]100 sim_seeds_seed = inputs[1]101 robot_sim = RobotGR(robot_config['direction_path'],102 robot_config['rotation_angle_path']103 if 'rotation_angle_path' in robot_config else None,104 robot_config['simulation_path'], robot_config['tracker_path'],105 robot_config['sim_time'], robot_config.get('noise_type', 1),106 robot_config.get('noise_level', 0.035), robot_config['target_dist_bearing'],107 robot_config['robot']['id'], robot_config['robot']['num_faces'],108 robot_config['robot']['num_modules'], robot_config['robot']['robot_tests'],109 sim_seeds_seed, robot_config['robot']['modules_conf'])110 fitness = robot_sim.simulate(controller_path)111 return fitness112# performs the closing operations113def end_simulation(best_genome):114 pbar.close()115 # save checkpoint of last generation (if it has not been done yet)116 if not os.path.exists(os.path.join(checkpoint_dir, 'contr_cp_' + str(seed) + '_' + str(p.generation - 1))):117 checkpointer.save_checkpoint(p.config, p.population, p.species, p.generation - 1)118 # save checkpoint of last generation stats (if it has not been done yet)119 if not os.path.exists(120 os.path.join(checkpoint_dir, 'contr_stats_cp_' + str(seed) + '_' + str(p.generation - 1))):121 stats_reporter.save_checkpoint(p.generation - 1, stats_reporter.most_fit_genomes,122 stats_reporter.generation_statistics)123 history_file.close()124 contr_evo_file.write(']')125 contr_evo_file.close()126 # copy best controller into best directory127 best_dir = os.path.join(results_dir, 'best')128 if best_genome is not None:129 shutil.copy2(os.path.join(controllers_dir,130 'controller_' + str(seed) + '_' + str(best_genome.key) + '.txt'), best_dir)131 # compute elapsed time132 end_time = int(time.time() - start_time)133 elap_time = '{:02d}:{:02d}:{:02d}'.format(end_time // 3600,134 (end_time % 3600 // 60),135 end_time % 60)136 # save elapsed time137 print('Total computation time: {}'.format(elap_time))138 if elap_time != '00:00:00':139 mode = 'a' if checkpoint_used else 'w+'140 with open(os.path.join(results_dir, 'elapsed_time_sim_{}.txt'.format(seed)), mode) as time_file:141 time_file.write(elap_time+'\n')142 pool.close()143if __name__ == '__main__':144 parser = argparse.ArgumentParser(description='Script for simulating via EA the evolution of the controller '145 'of a modular soft robot for goal reaching or squeezing task')146 parser.add_argument('robot', metavar='robot', type=str, nargs='?',147 default='./experiments_gr/robot.json', help='file name of the file' +148 ' containing the robot configuration.' +149 ' By default it looks for the ./experiments_gr/robot.json file.')150 parser.add_argument('neat_settings', metavar='neat_settings', type=str, nargs='?',151 default='./experiments_gr/neat-settings.txt',152 help='file name of the file containing the NEAT configuration' +153 ' parameters. By default it uses ./experiments_gr/neat-settings.txt file.')154 parser.add_argument('--use-checkpoint', dest='use_checkpoint', action='store_const',155 const=True, help='Select whether to run the experiment ' +156 'from previous checkpoints (if exist).')157 args = parser.parse_args()158 # load robot configuration159 with open(str(args.robot)) as robot_conf_file:160 robot_config = json.load(robot_conf_file)161 # print simulation header162 print_header_contr_evo(robot_config)163 # prepare output folders164 results_dir = robot_config['result_dir']165 best_dir = os.path.join(results_dir, 'best')166 checkpoint_dir = os.path.join(results_dir, 'checkpoints')167 controllers_dir = os.path.join(results_dir, 'controllers')168 evo_dir = os.path.join(results_dir, 'evolution_info')169 # Note: the result_dir must be a proper path where to store the files, otherwise the execution will fail170 os.makedirs(results_dir, exist_ok=True)171 os.makedirs(best_dir, exist_ok=True)172 os.makedirs(checkpoint_dir, exist_ok=True)173 os.makedirs(controllers_dir, exist_ok=True)174 os.makedirs(evo_dir, exist_ok=True)175 # save settings176 with open(os.path.join(results_dir, 'settings.json'), 'w') as conf_file:177 json.dump(robot_config, conf_file, indent=4)178 # store robot information to be used as input for simulation and visualization179 robot = RobotGR(None, None, robot_config['simulation_path'], None, robot_config['sim_time'],180 robot_config.get('noise_type', 1), robot_config.get('noise_level', 0.035),181 None, None, robot_config['robot']['num_faces'], robot_config['robot']['num_modules'],182 None, None, robot_config['robot']['modules_conf'])183 robot.write_input_sim(os.path.join(results_dir, 'robot_sim.txt'))184 robot.write_input_vis(os.path.join(results_dir, 'robot_vis.txt'))185 # neat seeds186 seeds = robot_config['seeds']187 for seed_val in seeds:188 # set init_time variable189 start_time = time.time()190 # set global seed accordingly191 seed = seed_val192 # init simulations control vars193 num_sims = 0194 max_num_sims = robot_config['max_num_sims']195 sim_end = False196 # instantiate checkpointer197 checkpoint_prefix = 'contr_cp_' + str(seed) + '_'198 checkpointer = neat.Checkpointer(generation_interval=robot_config['checkpoint_freq'],199 time_interval_seconds=86400,200 filename_prefix=os.path.join(checkpoint_dir, checkpoint_prefix))201 # instantiate NEAT stats reporter202 stats_checkpoint_prefix = 'contr_stats_cp_' + str(seed) + '_'203 stats_reporter = StatisticsReporterGR(generation_interval=robot_config['checkpoint_freq'],204 filename_prefix=os.path.join(checkpoint_dir, stats_checkpoint_prefix))205 # history filename206 history_filename = os.path.join(evo_dir, 'history_' + str(seed) + '.csv')207 # evolution info and checkpoint files208 contr_evo_filename = os.path.join(evo_dir, 'contr_evo_{}.json'.format(seed))209 controller_checkpoints = {int(f.split('_')[-1]): os.path.join(checkpoint_dir, f)210 for f in os.listdir(checkpoint_dir)211 if re.match(checkpoint_prefix + '[0-9]', f)}212 # if use_checkpoint option is True and a checkpoint file exists, load the population from it213 if args.use_checkpoint and \214 len(controller_checkpoints) != 0 and \215 os.path.exists(controller_checkpoints[max(controller_checkpoints.keys())]):216 checkpoint_used = True217 # get index of the last checkpoint218 last_checkpoint = max(controller_checkpoints.keys())219 # load population220 p = neat.Checkpointer.restore_checkpoint(os.path.join(checkpoint_dir,221 checkpoint_prefix+str(last_checkpoint)))222 p.generation += 1223 p.reproduction.genome_indexer = count(max(p.population.keys()) + 1)224 # set last gen checkpoint property225 checkpointer.last_generation_checkpoint = last_checkpoint226 # load stats checkpoint227 most_fit_genomes, generation_statistics = StatisticsReporterGR.restore_checkpoint(228 os.path.join(checkpoint_dir, stats_checkpoint_prefix + str(last_checkpoint)))229 # restore stats reporter properties230 stats_reporter.most_fit_genomes = most_fit_genomes231 stats_reporter.generation_statistics = generation_statistics232 stats_reporter.last_generation_checkpoint = last_checkpoint233 # restore history file234 history_file, num_sims = restore_history_file(history_filename, last_checkpoint)235 # restore controller evolution file236 contr_evo_file, contr_evo_obj = restore_evo_file(contr_evo_filename, last_checkpoint, num_sims)237 # restore best individual in population238 p.best_genome = restore_best_controller(contr_evo_obj, controllers_dir, seed)239 else:240 checkpoint_used = False241 # init random number generator242 random.seed(seed)243 # load NEAT configuration244 neat_config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,245 neat.DefaultSpeciesSet, neat.DefaultStagnation,246 args.neat_settings)247 # adapt NEAT configuration by replacing num_output value248 neat_config.genome_config.num_outputs = 2*robot_config['robot']['num_modules']249 neat_config.genome_config.output_keys = [i for i in range(neat_config.genome_config.num_outputs)]250 # save config file251 neat_config.save(os.path.join(results_dir, 'neat_settings.txt'))252 # create the population253 p = neat.Population(neat_config)254 # open history file255 history_file = open_history_file(history_filename)256 # open controllers evolution file257 contr_evo_file = open_evo_file(contr_evo_filename)258 # add checkpointer259 p.add_reporter(checkpointer)260 # add stats reporter261 p.add_reporter(stats_reporter)262 # multiprocessing pool instantiation263 pool = multiprocessing.Pool(processes=24)264 # create progress bar265 pbar = tqdm(total=max_num_sims, postfix={})266 pbar.update(min(num_sims, max_num_sims))267 # run NEAT for the remaining generations268 best = p.run(eval_controllers, math.ceil(max_num_sims / p.config.pop_size) - p.generation)269 while num_sims < max_num_sims:270 best = p.run(eval_controllers, 1)271 # perform closing operations272 if not sim_end:...
sb3_main.py
Source:sb3_main.py
1import argparse2from pathlib import Path3import pprint4import random5import yaml6import uuid7import numpy as np8from mlagents_envs.environment import UnityEnvironment9from mlagents.trainers.stats import StatsReporter, TensorboardWriter10from mlagents.trainers.agent_processor import AgentSideChannel11from mlagents_envs.side_channel.stats_side_channel import (12 StatsAggregationMethod)13from mlagents_envs.side_channel.engine_configuration_channel import \14 EngineConfigurationChannel15from gym_unity.envs import UnityToGymWrapper16import torch17from stable_baselines3.common.callbacks import EvalCallback18from stable_baselines3.common.utils import configure_logger19from stable_baselines3 import PPO as PPOSB20# Added for the side channel21from mlagents_envs.side_channel.side_channel import (22 SideChannel,23 IncomingMessage24)25try:26 from mpi4py import MPI27except ImportError:28 MPI = None29class SB3StatsRecorder(SideChannel):30 """31 Side channel that receives (string, float) pairs from the environment, so32 that they can eventually be passed to a StatsReporter.33 """34 def __init__(self,35 stats_reporter: StatsReporter,36 summary_freq: int = 2000) -> None:37 # >>> uuid.uuid5(uuid.NAMESPACE_URL, "com.unity.ml-agents/StatsSideChannel")38 # UUID('a1d8f7b7-cec8-50f9-b78b-d3e165a78520')39 super().__init__(uuid.UUID("a1d8f7b7-cec8-50f9-b78b-d3e165a78520"))40 self._stats_reporter = stats_reporter41 self.summary_freq = summary_freq42 self.env_step = 043 self.train_step = 044 def on_message_received(self, msg: IncomingMessage) -> None:45 """46 Receive the message from the environment, and save it for later47 retrieval.48 :param msg:49 :return:50 """51 key = msg.read_string()52 val = msg.read_float32()53 agg_type = StatsAggregationMethod(msg.read_int32())54 if agg_type == StatsAggregationMethod.AVERAGE:55 self._stats_reporter.add_stat(key, val, agg_type)56 elif agg_type == StatsAggregationMethod.SUM:57 self._stats_reporter.add_stat(key, val, agg_type)58 elif agg_type == StatsAggregationMethod.HISTOGRAM:59 self._stats_reporter.add_stat(key, val, agg_type)60 elif agg_type == StatsAggregationMethod.MOST_RECENT:61 self._stats_reporter.set_stat(key, val)62 else:63 raise NotImplemented(64 f"Unknown StatsAggregationMethod encountered. {agg_type}")65 if "task" in key or "Task" in key: # Another hack, otherwise the mostRecent data will be lost66 self._stats_reporter.write_stats(self.train_step)67 if self.train_step % self.summary_freq == 0:68 self._stats_reporter.write_stats(self.train_step)69 if 'time_step' in key: # nice hack to sync!70 self.train_step = val71 self.env_step = self.env_step + 172def make_unity_env(unity_env_filename, task_name,73 seed, base_port, env_args, no_graphics,74 time_scale=20, summary_freq=2000, worker_id=0,75 results_dir=None):76 # Side channels77 if results_dir is not None:78 tw = TensorboardWriter(results_dir, clear_past_data=True,79 hidden_keys=["Is Training", "Step"])80 StatsReporter.add_writer(tw)81 stats_reporter = StatsReporter(task_name)82 stats_channel = SB3StatsRecorder(stats_reporter, summary_freq)83 engine_channel = EngineConfigurationChannel()84 engine_channel.set_configuration_parameters(time_scale=time_scale)85 agent_channel = AgentSideChannel() # dummy just to silent the data86 side_channels = [engine_channel, stats_channel, agent_channel]87 unity_env = UnityEnvironment(file_name=unity_env_filename,88 seed=seed,89 no_graphics=no_graphics,90 side_channels=side_channels,91 additional_args=env_args,92 base_port=base_port,93 worker_id=worker_id)94 env = UnityToGymWrapper(unity_env)95 return env96def run_sb3(args):97 # set all the seeds98 torch.manual_seed(args.seed)99 np.random.seed(args.seed)100 random.seed(args.seed)101 # Model and training102 with open(args.ml_config_path) as file:103 sb_config = yaml.load(file, Loader=yaml.FullLoader)104 sb_args = sb_config[args.task_name]105 summary_freq = 2000106 time_scale = 20107 # Paths108 log_path = args.results_dir / args.run_id109 stats_path = log_path / "stats_reporter"110 gym_stats_path = log_path / "gym_training"111 # Create envs112 env_args = []113 env = make_unity_env(unity_env_filename=str(args.env),114 task_name=args.task_name + "_train",115 seed=args.seed,116 base_port=args.initial_port,117 env_args=env_args,118 no_graphics=args.no_graphics,119 time_scale=time_scale,120 summary_freq=summary_freq,121 results_dir=stats_path)122 test_env = make_unity_env(unity_env_filename=str(args.env),123 task_name=args.task_name + "_test",124 seed=args.seed,125 base_port=args.initial_port + 1,126 env_args=env_args,127 no_graphics=args.no_graphics,128 time_scale=time_scale,129 summary_freq=summary_freq)130 model = PPOSB('MlpPolicy', env, **sb_args, verbose=1,131 tensorboard_log=str(gym_stats_path))132 eval_callback = EvalCallback(test_env,133 best_model_save_path=log_path,134 log_path=gym_stats_path,135 n_eval_episodes=1,136 eval_freq=10000,137 deterministic=True, render=False)138 new_logger = configure_logger(tensorboard_log=gym_stats_path)139 model.set_logger(new_logger)140 model.learn(total_timesteps=args.n_timesteps, callback=eval_callback)141 env.close()142def main():143 parser = argparse.ArgumentParser(144 description="Train agents for FAST")145 # Common arguments between mlagnets and stable baselines 3146 parser.add_argument("--env",147 type=Path,148 help="Path in which unity env is located")149 parser.add_argument("-n", "--run_id",150 type=Path,151 help="directory name for results to be saved")152 parser.add_argument("-p", "--initial_port",153 type=int,154 default=5005,155 help="From this number of port + # of experiments to "156 "run_mlagents should be a free port")157 parser.add_argument("--ml_config_path",158 type=Path,159 default=Path("configs/config_sb3.yaml"),160 help="Path to the ml-agents or sb3 config. "161 "Ex: 'configs/fast_ppo_config_linear_lr.yaml'")162 parser.add_argument("--fast_config_path",163 type=Path,164 default=None,165 help="Path to the FAST config located in "166 "StreamingAssets"167 "Ex: 'global_custom_config.yaml'")168 parser.add_argument("--results_dir",169 type=Path,170 help="Path in which results of training are/will be "171 "located")172 parser.add_argument("--seed",173 type=int,174 default=13,175 help="Random seed to use. If None, different seeds "176 "for each experiment will be used")177 parser.add_argument("--resume",178 action='store_true',179 help="Resume training or inference")180 parser.add_argument("--inference",181 action='store_true',182 help="Run inference")183 # Framework specific arguments184 parser.add_argument('--task_name', default='ImageCentering')185 parser.add_argument('--no_graphics', default=True, required=False,186 action='store_false', help='no graphics')187 parser.add_argument('--n_envs', default=1, type=int,188 help='number of parallel envs')189 parser.add_argument('--n_timesteps', default=30000000, type=int,190 required=False, help='total number of steps')191 args = parser.parse_args()192 if args.n_envs > 1:193 raise NotImplementedError("Parallelization is not implemented")194 print(" Experiment parameters: ")195 print("-" * 100)196 pprint.pprint(vars(args), indent=5)197 print("-" * 100)198 run_sb3(args)199if __name__ == '__main__':...
runtime.py
Source:runtime.py
1import multiprocessing as mp2import multiprocessing.pool3import threading4from collections import defaultdict5from itertools import chain6from queue import SimpleQueue7from selectors import EVENT_READ, DefaultSelector8from statistics import mean9from time import time10from typing import Dict, NamedTuple, Optional11import torch12from prefetch_generator import BackgroundGenerator13from src.moe.server.expert_backend import ExpertBackend14from src.utils import get_logger15logger = get_logger(__name__)16class Runtime(threading.Thread):17 """18 A group of processes that processes incoming requests for multiple experts on a shared device.19 Runtime is usually created and managed by Server, humans need not apply.20 For debugging, you can start runtime manually with .start() or .run()21 >>> expert_backends = {'expert_name': ExpertBackend(**kwargs)}22 >>> runtime = Runtime(expert_backends)23 >>> runtime.start() # start runtime in background thread. To start in current thread, use runtime.run()24 >>> runtime.ready.wait() # await for runtime to load all experts on device and create request pools25 >>> future = runtime.expert_backends['expert_name'].forward_pool.submit_task(*expert_inputs)26 >>> print("Returned:", future.result())27 >>> runtime.shutdown()28 :param expert_backends: a dict [expert uid -> ExpertBackend]29 :param prefetch_batches: form up to this many batches in advance30 :param sender_threads: dispatches outputs from finished batches using this many asynchronous threads31 :param device: if specified, moves all experts and data to this device via .to(device=device).32 If you want to manually specify devices for each expert (in their forward pass), leave device=None (default)33 :param stats_report_interval: interval to collect and log statistics about runtime performance34 """35 SHUTDOWN_TRIGGER = "RUNTIME SHUTDOWN TRIGGERED"36 def __init__(37 self,38 expert_backends: Dict[str, ExpertBackend],39 prefetch_batches=64,40 sender_threads: int = 1,41 device: torch.device = None,42 stats_report_interval: Optional[int] = None,43 ):44 super().__init__()45 self.expert_backends = expert_backends46 self.pools = tuple(chain(*(expert.get_pools() for expert in expert_backends.values())))47 self.device, self.prefetch_batches, self.sender_threads = device, prefetch_batches, sender_threads48 self.shutdown_recv, self.shutdown_send = mp.Pipe(duplex=False)49 self.shutdown_trigger = mp.Event()50 self.ready = mp.Event() # event is set iff server is currently running and ready to accept batches51 self.stats_report_interval = stats_report_interval52 if self.stats_report_interval is not None:53 self.stats_reporter = StatsReporter(self.stats_report_interval)54 def run(self):55 for pool in self.pools:56 if not pool.is_alive():57 pool.start()58 with mp.pool.ThreadPool(self.sender_threads) as output_sender_pool:59 try:60 self.ready.set()61 if self.stats_report_interval is not None:62 self.stats_reporter.start()63 logger.info("Started")64 for pool, batch_index, batch in BackgroundGenerator(65 self.iterate_minibatches_from_pools(), self.prefetch_batches66 ):67 logger.debug(f"Processing batch {batch_index} from pool {pool.name}")68 start = time()69 outputs = pool.process_func(*batch)70 batch_processing_time = time() - start71 batch_size = outputs[0].size(0)72 logger.debug(f"Pool {pool.name}: batch {batch_index} processed, size {batch_size}")73 if self.stats_report_interval is not None:74 self.stats_reporter.report_stats(pool.name, batch_size, batch_processing_time)75 output_sender_pool.apply_async(pool.send_outputs_from_runtime, args=[batch_index, outputs])76 finally:77 if not self.shutdown_trigger.is_set():78 self.shutdown()79 def shutdown(self):80 """Gracefully terminate a running runtime."""81 logger.info("Shutting down")82 self.ready.clear()83 if self.stats_report_interval is not None:84 self.stats_reporter.stop.set()85 self.stats_reporter.join()86 logger.debug("Terminating pools")87 for pool in self.pools:88 if pool.is_alive():89 pool.terminate()90 pool.join()91 logger.debug("Pools terminated")92 # trigger background thread to shutdown93 self.shutdown_send.send(self.SHUTDOWN_TRIGGER)94 self.shutdown_trigger.set()95 def iterate_minibatches_from_pools(self, timeout=None):96 """97 Chooses pool according to priority, then copies exposed batch and frees the buffer98 """99 with DefaultSelector() as selector:100 for pool in self.pools:101 selector.register(pool.batch_receiver, EVENT_READ, pool)102 selector.register(self.shutdown_recv, EVENT_READ, self.SHUTDOWN_TRIGGER)103 while True:104 # wait until at least one batch_receiver becomes available105 logger.debug("Waiting for inputs from task pools")106 ready_fds = selector.select()107 ready_objects = {key.data for (key, events) in ready_fds}108 if self.SHUTDOWN_TRIGGER in ready_objects:109 break # someone asked us to shutdown, break from the loop110 logger.debug("Choosing the pool with highest priority")111 pool = max(ready_objects, key=lambda pool: pool.priority)112 logger.debug(f"Loading batch from {pool.name}")113 batch_index, batch_tensors = pool.load_batch_to_runtime(timeout, self.device)114 logger.debug(f"Loaded batch from {pool.name}")115 yield pool, batch_index, batch_tensors116BatchStats = NamedTuple("BatchStats", (("batch_size", int), ("processing_time", float)))117class StatsReporter(threading.Thread):118 def __init__(self, report_interval: int):119 super().__init__()120 self.report_interval = report_interval121 self.stop = threading.Event()122 self.stats_queue = SimpleQueue()123 def run(self):124 while not self.stop.wait(self.report_interval):125 pool_batch_stats = defaultdict(list)126 while not self.stats_queue.empty():127 pool_uid, batch_stats = self.stats_queue.get()128 pool_batch_stats[pool_uid].append(batch_stats)129 total_processed_batches = sum(len(pool_stats) for pool_stats in pool_batch_stats.values())130 logger.info(f"Processed {total_processed_batches} batches in last {self.report_interval} seconds:")131 for pool_uid, pool_stats in pool_batch_stats.items():132 total_batches = len(pool_stats)133 total_examples = sum(batch_stats.batch_size for batch_stats in pool_stats)134 avg_batch_size = mean(batch_stats.batch_size for batch_stats in pool_stats)135 total_time = sum(batch_stats.processing_time for batch_stats in pool_stats)136 batches_to_time = total_batches / total_time137 batch_performance = f"{batches_to_time:.2f} " + ("batches/s" if batches_to_time > 1 else "s/batch")138 examples_to_time = total_examples / total_time139 example_performance = f"{examples_to_time:.2f} " + (140 "examples/s" if examples_to_time > 1 else "s/example"141 )142 logger.info(143 f"{pool_uid}: "144 f"{total_batches} batches ({batch_performance}), "145 f"{total_examples} examples ({example_performance}), "146 f"avg batch size {avg_batch_size:.2f}"147 )148 def report_stats(self, pool_uid, batch_size, processing_time):149 batch_stats = BatchStats(batch_size, processing_time)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!