How to use worker_count method in locust

Best Python code snippet using locust

find_async_variance.py

Source:find_async_variance.py Github

copy

Full Screen

1import csv2import json3import argparse4import os.path5import numpy as np6import matplotlib.pylab as plt7from parse_step_times import *8aggregation_start_dict = {'inception-v3': ['inception_v3/conv0/BatchNorm/moments/Squeeze'],9 'resnet-200': ['resnet_v2_200/block4/unit_2/bottleneck_v2/conv1/BatchNorm/Const_2'],10 'resnet-101': ['resnet_v2_101/block4/unit_3/bottleneck_v2/conv2/BatchNorm/FusedBatchNorm'],11 'vgg16': ['vgg/conv0/BatchNorm/moments/Squeeze']12}13aggregation_end_dict = {'inception-v3': ['gradients/AddN_304'],14 'resnet-200': ['gradients/AddN_269'],15 'resnet-101': ['gradients/AddN_137'],16 'vgg16': ['gradients/AddN_41'],17 }18# Worker indicates to parameter servers that it is ready to receive the19# updated parameters. Same for all models.20trigger_variables = 'sync_rep_local_step/read'21# First step in the back propagation step22# Actually use the last squeeze from the forward propagation step23first_back_prop = {'inception-v3': ['gradients/inception_v3/mixed_8x8x2048b/branch_pool/Conv/BatchNorm/batchnorm/sub_grad/tuple/control_dependency'],24 'resnet-200': ['gradients/resnet_v2_200/logits/BiasAdd_grad/tuple/control_dependency_1'],25 'resnet-101': ['gradients/resnet_v2_101/logits/BiasAdd_grad/tuple/control_dependency_1'],26 'vgg16': ['gradients/vgg/logits/xw_plus_b_grad/tuple/control_dependency_1', 'gradients/AddN'],27 }28sync_local_step_time = 'sync_rep_local_step/read'29def plot_variances(all_variance_median, all_variance_90, all_variance_10, model_name):30 all_lines = []31 for edge_key in all_variance_median:32 lists = sorted(all_variance_median[edge_key].items())33 x,y = zip(*lists)34 line = plt.errorbar(x,y, yerr=[all_variance_10[edge_key],all_variance_90[edge_key]], label=edge_key)35 all_lines.append(line)36 plt.title('Send time variance between workers for {}'.format(model_name))37 plt.legend(handles=all_lines)38 plt.xlabel('Number of workers')39 plt.ylabel('Std deviation of event (msec)')40 plt.show()41# Plots a dict of worker -> a list of outputs42def plot_variable_distributions(worker_plot, model_name, title):43 base_dir = 'async_analysis/{}/'.format(model_name)44 if not os.path.exists(base_dir):45 os.makedirs(base_dir)46 num_rows = 247 num_columns = (len(worker_plot.keys()) / 2) + (len(worker_plot.keys()) % 2)48 fig, ax = plt.subplots(num_rows, num_columns)49 row_counter = 050 col_counter = 051 for num_workers in worker_plot:52 ax[row_counter, col_counter].hist(worker_plot[num_workers])53 ax[row_counter, col_counter].set_title('{} workers'.format(num_workers))54 col_counter += 155 if col_counter == num_columns:56 row_counter += 157 col_counter = 058 plt.tight_layout()59 plt.suptitle('{} for {}'.format(title, model_name))60 plt.savefig('{}_{}_{}_ALL'.format(base_dir, model_name, title))61 plt.show()62# Plots a dict of worker -> a list of outputs63def plot_variable_distributions_by_worker(worker_plot, model_name, title):64 base_dir = 'async_analysis/{}/'.format(model_name)65 if not os.path.exists(base_dir):66 os.makedirs(base_dir)67 '''68 worker_stack = {}69 for num_workers in worker_plot:70 worker_stack[num_workers] = []71 for worker_id in worker_plot[num_workers]:72 worker_plot = worker_plot[num_workers][worker_id]73 for result in worker_plot:74 worker_stack[num_workers][75 '''76 num_rows = 277 num_columns = (len(worker_plot.keys()) / 2) + (len(worker_plot.keys()) % 2)78 fig, ax = plt.subplots(num_rows, num_columns)79 row_counter = 080 col_counter = 081 for num_workers in worker_plot:82 list_to_plot = []83 for wk_id in worker_plot[num_workers]:84 list_to_plot.append(worker_plot[num_workers][wk_id])85 ax[row_counter, col_counter].hist(list_to_plot, stacked=True)86 ax[row_counter, col_counter].set_title('{} workers'.format(num_workers))87 col_counter += 188 if col_counter == num_columns:89 row_counter += 190 col_counter = 091 plt.tight_layout()92 plt.suptitle('{} for {}'.format(title, model_name))93 plt.savefig('{}_{}_{}_ALL'.format(base_dir, model_name, title))94 plt.show()95def get_mean_var_distr_by_worker(worker_result):96 for num_workers in worker_result:97 print 'Cluster of {}'.format(num_workers)98 for wk_id in worker_result[num_workers]:99 median = np.percentile(worker_result[num_workers][wk_id], 50)100 percentile90 = np.percentile(worker_result[num_workers][wk_id], 90)101 percentile10 = np.percentile(worker_result[num_workers][wk_id], 10)102 increase90 = 100 * (percentile90 - median) / float(median)103 increase10 = 100 * (percentile10 - median) / float(median)104 print 'Worker {}, median = {}, 10th = {}, 90th = {}'.format(wk_id, median, increase90, increase10)105 print '\n\n\n'106 107'''108Iterates through all the iterations in a particular run and determines the variance per variable per machine109'''110if __name__ == "__main__":111 parser = argparse.ArgumentParser()112 parser.add_argument('--base_file')113 parser.add_argument('--num_ps')114 parser.add_argument('--num_wk')115 parser.add_argument('--model_name')116 args = parser.parse_args()117 # Collect the asynchrony per setup (i.e., varying the number of workers in the cluster)118 all_results_per_setup = {}119 variance_per_setup = {}120 for num_workers in range(int(args.num_wk)+1):121 base_dir = args.base_file + args.model_name + '/{}_workers/'.format(num_workers)122 if os.path.isdir(base_dir) is False:123 continue124 125 wk_events = {}126 for wk_index in range(num_workers+1):127 file_name = base_dir + 'gpu{}.txt'.format(wk_index)128 if os.path.isfile(file_name) is False:129 file_name_alternate = base_dir + 'wk{}.txt'.format(wk_index)130 if os.path.isfile(file_name_alternate) is False:131 continue132 else:133 file_name = file_name_alternate134 # Collect the events that happened by time135 wk_events[wk_index] = parse_result_file_iteration([file_name], args.model_name)136 # Collect all the events that are known to exist per iteration137 # Choose Worker 1138 for step_num in wk_events[1]:139 if step_num < 5:140 continue141 raw_edgename_list = wk_events[1][step_num]142 # Calculate the average variance from all the steps across all the workers143 # Also record the full logs for later consumption144 min_step_num = 16145 max_step_num = len(wk_events[1].keys()) - 1146 edgename_variance = {}147 edgename_raw = {}148 149 for edgename in raw_edgename_list:150 variance_list = []151 edgename_raw[edgename] = {}152 for step_num in range(min_step_num, max_step_num):153 time_occured = []154 edgename_raw[edgename][step_num] = {}155 for wk_id in wk_events:156 # find the time of the event157 if step_num in wk_events[wk_id]:158 edgename_raw[edgename][step_num][wk_id] = wk_events[wk_id][step_num][edgename][0]159 time_occured.append(wk_events[wk_id][step_num][edgename][0])160 else:161 continue162 variance_list.append(np.std(time_occured))163 edgename_variance[edgename] = variance_list164 variance_per_setup[num_workers] = edgename_variance165 all_results_per_setup[num_workers] = edgename_raw166 # Determine the edges for aggregation start and end167 aggregation_start = ''168 aggregation_end = ''169 try:170 aggregation_start_list = aggregation_start_dict[args.model_name]171 aggregation_end_list = aggregation_end_dict[args.model_name]172 except KeyError:173 print 'Invalid Model Name'174 exit()175 176 for candidate in aggregation_start_list:177 if candidate in variance_per_setup[2]:178 aggregation_start = candidate179 break180 for candidate in aggregation_end_list:181 if candidate in variance_per_setup[2]:182 aggregation_end = candidate183 break184 # Calculate the distribution of the time between185 # 1.) sync_local_step and first backpropagation step186 # 2.) first back_propagation step and last back_propagation step187 sync_local_step_time = 'sync_rep_local_step/read'188 first_back_prop_event = first_back_prop[args.model_name][0]189 start_to_first_grad = {}190 first_grad_to_last_grad = {}191 start_to_first_grad_wk = {}192 first_grad_to_last_grad_wk = {}193 for worker_count in all_results_per_setup:194 try:195 sync_local_step = all_results_per_setup[worker_count][sync_local_step_time]196 first_grad = all_results_per_setup[worker_count][first_back_prop_event]197 last_grad = all_results_per_setup[worker_count][aggregation_end]198 except KeyError:199 print worker_count200 continue201 start_to_first_grad[worker_count] = []202 first_grad_to_last_grad[worker_count] = []203 first_grad_to_last_grad_wk[worker_count] = {}204 start_to_first_grad_wk[worker_count] = {}205 206 for step_num in first_grad:207 for wk_id in first_grad[step_num]:208 start_to_first_grad[worker_count].append(first_grad[step_num][wk_id] - sync_local_step[step_num][wk_id])209 first_grad_to_last_grad[worker_count].append(last_grad[step_num][wk_id] - first_grad[step_num][wk_id])210 if wk_id not in first_grad_to_last_grad_wk[worker_count]:211 start_to_first_grad_wk[worker_count][wk_id] = [first_grad[step_num][wk_id] - sync_local_step[step_num][wk_id]]212 first_grad_to_last_grad_wk[worker_count][wk_id] = [last_grad[step_num][wk_id] - first_grad[step_num][wk_id]]213 else:214 start_to_first_grad_wk[worker_count][wk_id].append(first_grad[step_num][wk_id] - sync_local_step[step_num][wk_id])215 first_grad_to_last_grad_wk[worker_count][wk_id].append(last_grad[step_num][wk_id] - first_grad[step_num][wk_id])216 print 'Start to first grad data'217 get_mean_var_distr_by_worker(start_to_first_grad_wk)218 print '\n\n\n First to last grad'219 get_mean_var_distr_by_worker(first_grad_to_last_grad_wk)220 plot_variable_distributions_by_worker(start_to_first_grad_wk, args.model_name, 'Start to first grad stacked')221 plot_variable_distributions_by_worker(first_grad_to_last_grad_wk, args.model_name, 'First to Last grad stacked')222 plot_variable_distributions(start_to_first_grad, args.model_name, 'Start to first grad')223 plot_variable_distributions(first_grad_to_last_grad, args.model_name, 'First to last grad')224 print 'Exiting after last plot'225 exit()226 # Calculate the distribution of specific variables relative to a start227 start_latency = {}228 end_latency = {}229 end_start_diff = {}230 231 sync_local_step_time = 'sync_rep_local_step/read'232 for worker_count in all_results_per_setup:233 try:234 start_results = all_results_per_setup[worker_count][aggregation_start]235 end_results = all_results_per_setup[worker_count][aggregation_end]236 scale_down = all_results_per_setup[worker_count][sync_local_step_time]237 except KeyError:238 print worker_count239 continue240 start_latency[worker_count] = []241 end_latency[worker_count] = []242 end_start_diff[worker_count] = []243 for step_num in start_results:244 for wk_id in start_results[step_num]:245 start_latency[worker_count].append(start_results[step_num][wk_id] - scale_down[step_num][wk_id])246 end_latency[worker_count].append(end_results[step_num][wk_id] - scale_down[step_num][wk_id])247 end_start_diff[worker_count].append(end_results[step_num][wk_id] - start_results[step_num][wk_id])248 plot_variable_distributions(start_latency, args.model_name, 'Start latency')249 plot_variable_distributions(end_latency, args.model_name, 'End Latency')250 plot_variable_distributions(end_start_diff, args.model_name, 'Computation Latency')251 252 # Plot the coarse variances253 all_variance_median = {'agg_start': {}, 'agg_end': {}}254 all_variance_90 = {'agg_start': {}, 'agg_end': {}}255 all_variance_10 = {'agg_start': {}, 'agg_end': {}}256 for worker_count in variance_per_setup:257 try:258 all_variance_median['agg_start'][worker_count] = np.percentile(variance_per_setup[worker_count][aggregation_start], 50)259 all_variance_90['agg_start'][worker_count] = np.percentile(variance_per_setup[worker_count][aggregation_start], 90)260 all_variance_10['agg_start'][worker_count] = np.percentile(variance_per_setup[worker_count][aggregation_start], 10)261 all_variance_median['agg_end'][worker_count] = np.percentile(variance_per_setup[worker_count][aggregation_end], 50)262 all_variance_90['agg_end'][worker_count] = np.percentile(variance_per_setup[worker_count][aggregation_end], 90)263 all_variance_10['agg_end'][worker_count] = np.percentile(variance_per_setup[worker_count][aggregation_end], 10)264 except:265 continue266 plot_variances(all_variance_median, all_variance_90, all_variance_10, args.model_name)267 ...

Full Screen

Full Screen

process.py

Source:process.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3Tests for the worker process _employer.4"""5# Future6from __future__ import absolute_import, division, print_function, \7 unicode_literals, with_statement8# Standard Library9from multiprocessing import Manager10# Third Party11import nose12from mock import Mock13from nose.tools.nontrivial import raises14# First Party15from metaopt.concurrent.employer.process import ProcessWorkerEmployer16class TestProcessWorkerEmployer(object):17 """18 Tests for the worker process _employer.19 """20 def __init__(self):21 self._queue_outcome = None22 self._queue_start = None23 self._queue_task = None24 self._employer = None25 def setup(self):26 """Nose will run this method before every test method."""27 manager = Manager()28 self._queue_task = manager.Queue() # ignore error, this works29 self._queue_start = manager.Queue() # ignore error, this works30 self._queue_outcome = manager.Queue() # ignore error, this works31 self._status_db = Mock()32 self._status_db.get_running_call = Mock(return_value=None)33 self._employer = ProcessWorkerEmployer(queue_tasks=self._queue_task,34 queue_outcome=self._queue_outcome,35 queue_start=self._queue_start,36 status_db=self._status_db)37 def teardown(self):38 """Nose will run this method after every test method."""39 self._employer.abandon()40 def test_employ_once(self):41 """42 A worker process _employer can employ a worker process.43 """44 self._employer.employ()45 def test_employ_repeated(self):46 """47 A worker process _employer can employ multiple worker processes.48 """49 number_of_workers = 150 for _ in range(1, 10):51 self._employer.employ(number_of_workers=number_of_workers)52 assert self._employer.worker_count == number_of_workers53 self._employer.abandon()54 @raises(IndexError)55 def test_employ_too_many(self):56 """57 A worker process employer can employ a limited number of workers.58 """59 # Try to employ a lot of workers.60 for number_of_workers in [_ ** _ for _ in range(0, 100)]:61 self._employer.employ(number_of_workers=number_of_workers)62 def test_employ_and_layoff_once(self):63 """64 A worker process employer can employ and layoff a worker process.65 """66 self._employer.employ()67 self._employer.abandon()68 def test_employ_layoff_twice(self):69 """70 A worker process employer can employ a worker process repeatedly.71 """72 # once73 self._employer.employ()74 self._employer.abandon()75 # and once more76 self._employer.employ()77 self._employer.abandon()78 def test_initialize_count(self):79 """A worker process _employer begins to count at 0."""80 assert self._employer.worker_count == 081 def test_count_up(self):82 """A worker process _employer counts in increments of 1."""83 self._employer.employ(1)84 assert self._employer.worker_count == 185 def test_count_up_down(self):86 """A worker process _employer counts up and down."""87 self._employer.employ(1)88 assert self._employer.worker_count == 189 self._employer.abandon()90 assert self._employer.worker_count == 091 def test_count_up_down_twice(self):92 """A worker process employer counts up and down repeatedly."""93 # once94 self._employer.employ(1)95 assert self._employer.worker_count == 196 self._employer.abandon()97 assert self._employer.worker_count == 098 # and once more99 self._employer.employ(1)100 assert self._employer.worker_count == 1101 self._employer.abandon()102 assert self._employer.worker_count == 0103 def test_is_borg(self):104 """There can only be one instance of a worker process _employer."""105 my_provider = ProcessWorkerEmployer(queue_tasks=self._queue_task,106 queue_outcome=self._queue_outcome,107 queue_start=self._queue_start,108 status_db=self._status_db)109 number_of_workers = 1110 # left111 self._employer.employ(number_of_workers=number_of_workers)112 assert self._employer.worker_count == number_of_workers113 assert self._employer.worker_count == my_provider.worker_count114 # right115 my_provider.employ(number_of_workers=number_of_workers)116 assert self._employer.worker_count == number_of_workers * 2117 assert self._employer.worker_count == my_provider.worker_count118 def test_employ_and_stop_multiple_workers(self):119 """Workers can be employed and laid off repeatedly."""120 worker_count = 2 # any number more than one proves the point121 # once122 self._employer.employ(number_of_workers=worker_count)123 self._employer.abandon()124 # and once more125 self._employer.employ(number_of_workers=worker_count)126 self._employer.abandon()127if __name__ == '__main__':...

Full Screen

Full Screen

gearman_worker.py

Source:gearman_worker.py Github

copy

Full Screen

1from optparse import make_option2import os3import sys4from django.conf import settings5from django.core.management.base import NoArgsCommand6from django_gearman import GearmanWorker7class Command(NoArgsCommand):8 ALL_QUEUES = '*'9 help = "Start a Gearman worker serving all registered Gearman jobs"10 __doc__ = help11 option_list = NoArgsCommand.option_list + (12 make_option('-w', '--workers', action='store', dest='worker_count',13 default='1', help='Number of workers to spawn.'),14 make_option('-q', '--queue', action='store', dest='queue',15 default=ALL_QUEUES, help='Queue to register tasks from'),16 )17 children = [] # List of worker processes18 @staticmethod19 def get_gearman_enabled_modules():20 gm_modules = []21 for app in settings.INSTALLED_APPS:22 try:23 gm_modules.append(__import__("%s.gearman_jobs" % app))24 except ImportError:25 pass26 if not gm_modules:27 return None28 return gm_modules29 def handle_noargs(self, **options):30 queue = options["queue"]31 # find gearman modules32 gm_modules = Command.get_gearman_enabled_modules()33 if not gm_modules:34 self.stderr.write("No gearman modules found!\n")35 return36 # find all jobs37 jobs = []38 for gm_module in gm_modules:39 try:40 gm_module.gearman_job_list41 except AttributeError:42 continue43 if queue == Command.ALL_QUEUES:44 for _jobs in gm_module.gearman_job_list.itervalues():45 jobs += _jobs46 else:47 jobs += gm_module.gearman_job_list.get(queue, [])48 if not jobs:49 self.stderr.write("No gearman jobs found!\n")50 return51 self.stdout.write("Available jobs:\n")52 for job in jobs:53 # determine right name to register function with54 self.stdout.write("* %s\n" % job.__name__)55 # spawn all workers and register all jobs56 try:57 worker_count = int(options['worker_count'])58 assert(worker_count > 0)59 except (ValueError, AssertionError):60 worker_count = 161 self.spawn_workers(worker_count, jobs)62 # start working63 self.stdout.write("Starting to work... (press ^C to exit)\n")64 try:65 for child in self.children:66 os.waitpid(child, 0)67 except KeyboardInterrupt:68 sys.exit(0)69 def spawn_workers(self, worker_count, jobs):70 """71 Spawn as many workers as desired (at least 1).72 Accepts:73 - worker_count, positive int74 - jobs: list of gearman jobs75 """76 # no need for forking if there's only one worker77 if worker_count == 1:78 return self.work(jobs)79 self.stdout.write("Spawning %s worker(s)\n" % worker_count)80 # spawn children and make them work (hello, 19th century!)81 for i in range(worker_count):82 child = os.fork()83 if child:84 self.children.append(child)85 continue86 else:87 self.work(jobs)88 break89 def work(self, jobs):90 """Children only: register all jobs, start working."""91 worker = GearmanWorker()92 for job in jobs:93 worker.register_task(job.__name__, job)94 try:95 worker.work()96 except KeyboardInterrupt:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful