Best Python code snippet using yandex-tank
load_plan.py
Source:load_plan.py
...31 return self.duration32 def __len__(self):33 '''Return total ammo count'''34 return int(self.duration / 1000 * self.rps)35 def get_rps_list(self):36 return [(int(self.rps), self.duration / 1000)]37 def __repr__(self):38 return 'const(%s, %s)' % (self.rps, self.duration / 1000)39class Line(object):40 '''Load plan with linear load'''41 def __init__(self, minrps, maxrps, duration):42 """43 :param minrps:44 :param maxrps:45 :param duration: milliseconds46 """47 self.minrps = float(minrps)48 self.maxrps = float(maxrps)49 self.duration = duration / 1000.050 self.slope = (self.maxrps - self.minrps) / self.duration51 def ts(self, n):52 """53 :param n: number of charge54 :return: when to shoot nth charge, milliseconds55 """56 try:57 root1, root2 = solve_quadratic(self.slope / 2.0, self.minrps, -n)58 except ZeroDivisionError:59 root2 = float(n) / self.minrps60 return int(root2 * 1000)61 def __iter__(self):62 """63 :return: timestamps for each charge64 """65 return (self.ts(n) for n in range(0, self.__len__()))66 def rps_at(self, t):67 '''Return rps for second t'''68 if 0 <= t <= self.duration:69 return self.minrps + \70 float(self.maxrps - self.minrps) * t / self.duration71 else:72 return 073 def get_duration(self):74 '''Return load duration in seconds'''75 return int(self.duration * 1000)76 def __len__(self):77 '''Return total ammo count'''78 return int((self.maxrps + self.minrps) / 2.0 * self.duration)79 def get_float_rps_list(self):80 '''81 get list of constant load parts (we have no constant load at all, but tank will think so),82 with parts durations (float)83 '''84 int_rps = range(int(self.minrps), int(self.maxrps) + 1)85 step_duration = float(self.duration) / len(int_rps)86 rps_list = [(rps, int(step_duration)) for rps in int_rps]87 return rps_list88 def get_rps_list(self):89 """90 get list of each second's rps91 :returns: list of tuples (rps, duration of corresponding rps in seconds)92 :rtype: list93 """94 seconds = range(0, int(self.duration) + 1)95 rps_groups = groupby([proper_round(self.rps_at(t)) for t in seconds],96 lambda x: x)97 rps_list = [(rps, len(list(rpl))) for rps, rpl in rps_groups]98 return rps_list99class Composite(object):100 '''Load plan with multiple steps'''101 def __init__(self, steps):102 self.steps = steps103 def __iter__(self):104 base = 0105 for step in self.steps:106 for ts in step:107 yield ts + base108 base += step.get_duration()109 def get_duration(self):110 '''Return total duration'''111 return sum(step.get_duration() for step in self.steps)112 def __len__(self):113 '''Return total ammo count'''114 return int(sum(step.__len__() for step in self.steps))115 def get_rps_list(self):116 return list(117 chain.from_iterable(step.get_rps_list() for step in self.steps))118class Stairway(Composite):119 def __init__(self, minrps, maxrps, increment, step_duration):120 if maxrps < minrps:121 increment = -increment122 n_steps = int((maxrps - minrps) / increment)123 steps = [124 Const(minrps + i * increment, step_duration)125 for i in range(0, n_steps + 1)126 ]127 if increment > 0:128 if (minrps + n_steps * increment) < maxrps:129 steps.append(Const(maxrps, step_duration))130 elif increment < 0:131 if (minrps + n_steps * increment) > maxrps:132 steps.append(Const(maxrps, step_duration))133 super(Stairway, self).__init__(steps)134class StepFactory(object):135 @staticmethod136 def line(params):137 template = re.compile('([0-9.]+),\s*([0-9.]+),\s*([0-9.]+[dhms]?)+\)')138 minrps, maxrps, duration = template.search(params).groups()139 return Line(float(minrps), float(maxrps), parse_duration(duration))140 @staticmethod141 def const(params):142 template = re.compile('([0-9.]+),\s*([0-9.]+[dhms]?)+\)')143 rps, duration = template.search(params).groups()144 return Const(float(rps), parse_duration(duration))145 @staticmethod146 def stairway(params):147 template = re.compile(148 '([0-9.]+),\s*([0-9.]+),\s*([0-9.]+),\s*([0-9.]+[dhms]?)+\)')149 minrps, maxrps, increment, duration = template.search(params).groups()150 return Stairway(151 float(minrps),152 float(maxrps), float(increment), parse_duration(duration))153 @staticmethod154 def produce(step_config):155 _plans = {156 'line': StepFactory.line,157 'const': StepFactory.const,158 'step': StepFactory.stairway,159 }160 load_type, params = step_config.split('(')161 load_type = load_type.strip()162 if load_type in _plans:163 return _plans[load_type](params)164 else:165 raise NotImplementedError(166 'No such load type implemented: "%s"' % load_type)167def create(rps_schedule):168 """169 Create Load Plan as defined in schedule. Publish info about its duration.170 """171 if len(rps_schedule) > 1:172 lp = Composite(173 [StepFactory.produce(step_config) for step_config in rps_schedule])174 else:175 lp = StepFactory.produce(rps_schedule[0])176 info.status.publish('duration', lp.get_duration() / 1000)177 info.status.publish('steps', lp.get_rps_list())178 info.status.lp_len = len(lp)...
monitoring.py
Source:monitoring.py
...8 await ws.prepare(req)9 global last_tick10 async for msg in ws:11 if msg.type == web.WSMsgType.text:12 rps_list = get_rps_list()13 await ws.send_json(data=rps_list)14 while not ws.closed:15 rps_list = get_rps_list()16 requests_num = 0 if not len(rps_list) else rps_list[len(rps_list) - 1]17 seconds_passed = time() - last_tick18 if seconds_passed < 1:19 await asyncio.sleep(1 - seconds_passed)20 last_tick = time()21 print('REQ', rps_list)22 try:23 await ws.send_str(str(requests_num))24 except ConnectionResetError:25 break26 elif msg.type == web.WSMsgType.close:27 break...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!