Best Python code snippet using yandex-tank
utils.py
Source:utils.py
1""" Utility classes for phantom module """2# TODO: use separate answ log per benchmark3import copy4import logging5import multiprocessing6import string7from pkg_resources import resource_string8from yandextank.common.util import AddressWizard9from ...stepper import StepperWrapper10from ...stepper.util import parse_duration11logger = logging.getLogger(__name__)12class PhantomConfig:13 """ config file generator """14 OPTION_PHOUT = "phout_file"15 SECTION = 'phantom'16 def __init__(self, core):17 self.core = core18 self.streams = []19 # common20 self.timeout = 1100021 self.answ_log = None22 self.answ_log_level = None23 self.phout_file = None24 self.stat_log = None25 self.phantom_log = None26 self.phantom_start_time = None27 self.phantom_modules_path = None28 self.threads = None29 self.additional_libs = None30 self.enum_ammo = False31 def get_option(self, opt_name, default=None):32 """ get option wrapper """33 return self.core.get_option(self.SECTION, opt_name, default)34 @staticmethod35 def get_available_options():36 opts = [37 "threads",38 "phantom_modules_path",39 "additional_libs",40 "writelog",41 "enum_ammo",42 "timeout",43 ]44 opts += StreamConfig.get_available_options()45 return opts46 def read_config(self):47 """ Read phantom tool specific options """48 self.threads = self.get_option(49 "threads", str(int(multiprocessing.cpu_count() / 2) + 1))50 self.phantom_modules_path = self.get_option(51 "phantom_modules_path", "/usr/lib/phantom")52 self.additional_libs = self.get_option("additional_libs", "")53 self.answ_log_level = self.get_option("writelog", "none")54 if self.answ_log_level == '0':55 self.answ_log_level = 'none'56 elif self.answ_log_level == '1':57 self.answ_log_level = 'all'58 self.timeout = parse_duration(self.get_option("timeout", "11s"))59 if self.timeout > 120000:60 logger.warning(61 "You've set timeout over 2 minutes."62 " Are you a functional tester?")63 self.answ_log = self.core.mkstemp(".log", "answ_")64 self.core.add_artifact_file(self.answ_log)65 self.phout_file = self.core.get_option(66 self.SECTION, self.OPTION_PHOUT, '')67 if not self.phout_file:68 self.phout_file = self.core.mkstemp(".log", "phout_")69 self.core.set_option(70 self.SECTION, self.OPTION_PHOUT, self.phout_file)71 self.core.add_artifact_file(self.phout_file)72 self.stat_log = self.core.mkstemp(".log", "phantom_stat_")73 self.core.add_artifact_file(self.stat_log)74 self.phantom_log = self.core.mkstemp(".log", "phantom_")75 self.core.add_artifact_file(self.phantom_log)76 main_stream = StreamConfig(77 self.core,78 len(self.streams), self.phout_file, self.answ_log,79 self.answ_log_level, self.timeout, self.SECTION)80 self.streams.append(main_stream)81 for section in self.core.config.find_sections(self.SECTION + '-'):82 self.streams.append(83 StreamConfig(84 self.core,85 len(self.streams), self.phout_file, self.answ_log,86 self.answ_log_level, self.timeout, section))87 for stream in self.streams:88 stream.read_config()89 if any(stream.ssl for stream in self.streams):90 self.additional_libs += ' ssl io_benchmark_method_stream_transport_ssl'91 def compose_config(self):92 """ Generate phantom tool run config """93 streams_config = ''94 stat_benchmarks = ''95 for stream in self.streams:96 streams_config += stream.compose_config()97 if stream.section != self.SECTION:98 stat_benchmarks += " " + "benchmark_io%s" % stream.sequence_no99 kwargs = {}100 kwargs['threads'] = self.threads101 kwargs['phantom_log'] = self.phantom_log102 kwargs['stat_log'] = self.stat_log103 kwargs['benchmarks_block'] = streams_config104 kwargs['stat_benchmarks'] = stat_benchmarks105 kwargs['additional_libs'] = self.additional_libs106 kwargs['phantom_modules_path'] = self.phantom_modules_path107 filename = self.core.mkstemp(".conf", "phantom_")108 self.core.add_artifact_file(filename)109 logger.debug("Generating phantom config: %s", filename)110 template_str = resource_string(__name__, "config/phantom.conf.tpl")111 tpl = string.Template(template_str)112 config = tpl.substitute(kwargs)113 handle = open(filename, 'w')114 handle.write(config)115 handle.close()116 return filename117 def set_timeout(self, timeout):118 """ pass timeout to all streams """119 for stream in self.streams:120 stream.timeout = timeout121 def get_info(self):122 """ get merged info about phantom conf """123 result = copy.copy(self.streams[0])124 result.stat_log = self.stat_log125 result.steps = []126 result.ammo_file = ''127 result.rps_schedule = None128 result.ammo_count = 0129 result.duration = 0130 result.instances = 0131 result.loadscheme = []132 result.loop_count = 0133 for stream in self.streams:134 sec_no = 0135 logger.debug("Steps: %s", stream.stepper_wrapper.steps)136 for item in stream.stepper_wrapper.steps:137 for x in range(0, item[1]):138 if len(result.steps) > sec_no:139 result.steps[sec_no][0] += item[0]140 else:141 result.steps.append([item[0], 1])142 sec_no += 1143 if result.rps_schedule:144 result.rps_schedule = []145 else:146 result.rps_schedule = stream.stepper_wrapper.loadscheme147 if result.loadscheme:148 result.loadscheme = ''149 else:150 # FIXME: add formatted load scheme for server:151 # <step_size,step_type,first_rps,last_rps,original_step_params>152 # as a string153 result.loadscheme = ''154 if result.loop_count:155 result.loop_count = u'0'156 else:157 result.loop_count = stream.stepper_wrapper.loop_count158 result.ammo_file += stream.stepper_wrapper.ammo_file + ' '159 result.ammo_count += stream.stepper_wrapper.ammo_count160 result.duration = max(161 result.duration, stream.stepper_wrapper.duration)162 result.instances += stream.instances163 if not result.ammo_count:164 raise ValueError("Total ammo count cannot be zero")165 return result166class StreamConfig:167 """ each test stream's config """168 OPTION_INSTANCES_LIMIT = 'instances'169 def __init__(170 self, core, sequence, phout, answ, answ_level, timeout, section):171 self.core = core172 self.address_wizard = AddressWizard()173 self.sequence_no = sequence174 self.section = section175 self.stepper_wrapper = StepperWrapper(self.core, self.section)176 self.phout_file = phout177 self.answ_log = answ178 self.answ_log_level = answ_level179 self.timeout = timeout180 # per benchmark181 self.instances = None182 self.ipv6 = None183 self.ssl = None184 self.address = None185 self.port = None186 self.tank_type = None187 self.stpd = None188 self.gatling = None189 self.phantom_http_line = None190 self.phantom_http_field_num = None191 self.phantom_http_field = None192 self.phantom_http_entity = None193 self.resolved_ip = None194 self.method_prefix = None195 self.source_log_prefix = None196 self.method_options = None197 self.client_cipher_suites = None198 self.client_certificate = None199 self.client_key = None200 def get_option(self, option_ammofile, default=None):201 """ get option wrapper """202 return self.core.get_option(self.section, option_ammofile, default)203 @staticmethod204 def get_available_options():205 opts = [206 "ssl", "tank_type", 'gatling_ip', "method_prefix",207 "source_log_prefix"208 ]209 opts += [210 "phantom_http_line", "phantom_http_field_num", "phantom_http_field",211 "phantom_http_entity"212 ]213 opts += ['address', "port", StreamConfig.OPTION_INSTANCES_LIMIT]214 opts += StepperWrapper.get_available_options()215 opts += ["connection_test"]216 return opts217 def read_config(self):218 """ reads config """219 # multi-options220 self.ssl = int(self.get_option("ssl", '0'))221 self.tank_type = self.get_option("tank_type", 'http')222 # TODO: refactor. Maybe we should decide how to interact with223 # StepperWrapper here.224 self.instances = int(225 self.get_option(self.OPTION_INSTANCES_LIMIT, '1000'))226 self.gatling = ' '.join(self.get_option('gatling_ip', '').split("\n"))227 self.method_prefix = self.get_option("method_prefix", 'method_stream')228 self.method_options = self.get_option("method_options", '')229 self.source_log_prefix = self.get_option("source_log_prefix", '')230 self.phantom_http_line = self.get_option("phantom_http_line", "")231 self.phantom_http_field_num = self.get_option(232 "phantom_http_field_num", "")233 self.phantom_http_field = self.get_option("phantom_http_field", "")234 self.phantom_http_entity = self.get_option("phantom_http_entity", "")235 self.address = self.get_option('address', '127.0.0.1')236 do_test_connect = int(self.get_option("connection_test", "1")) > 0237 explicit_port = self.get_option('port', '')238 self.ipv6, self.resolved_ip, self.port, self.address = self.address_wizard.resolve(239 self.address, do_test_connect, explicit_port)240 logger.info(241 "Resolved %s into %s:%s", self.address, self.resolved_ip, self.port)242 self.client_cipher_suites = self.get_option("client_cipher_suites", "")243 self.client_certificate = self.get_option("client_certificate", "")244 self.client_key = self.get_option("client_key", "")245 self.stepper_wrapper.read_config()246 def compose_config(self):247 """ compose benchmark block """248 # step file249 self.stepper_wrapper.prepare_stepper()250 self.stpd = self.stepper_wrapper.stpd251 if self.stepper_wrapper.instances:252 self.instances = self.stepper_wrapper.instances253 if not self.stpd:254 raise RuntimeError("Cannot proceed with no STPD file")255 kwargs = {}256 kwargs['sequence_no'] = self.sequence_no257 if self.ssl:258 _auth_section = ''259 _ciphers = ''260 ssl_template = "transport_t ssl_transport = transport_ssl_t {\n" \261 " timeout = 1s\n" \262 " %s\n" \263 " %s}\n" \264 " transport = ssl_transport"265 if self.client_certificate or self.client_key:266 _auth_section = 'auth_t def_auth = auth_t { key = "%s" cert = "%s"} auth = def_auth' \267 % (self.client_key, self.client_certificate)268 if self.client_cipher_suites:269 _ciphers = 'ciphers = "%s"' % self.client_cipher_suites270 kwargs['ssl_transport'] = ssl_template % (_auth_section, _ciphers)271 else:272 kwargs['ssl_transport'] = ""273 kwargs['method_stream'] = self.method_prefix + \274 "_ipv6_t" if self.ipv6 else self.method_prefix + "_ipv4_t"275 kwargs['phout'] = self.phout_file276 kwargs['answ_log'] = self.answ_log277 kwargs['answ_log_level'] = self.answ_log_level278 kwargs['comment_answ'] = "# " if self.answ_log_level == 'none' else ''279 kwargs['stpd'] = self.stpd280 kwargs['source_log_prefix'] = self.source_log_prefix281 kwargs['method_options'] = self.method_options282 if self.tank_type:283 kwargs[284 'proto'] = "proto=http_proto%s" % self.sequence_no if self.tank_type == 'http' else "proto=none_proto"285 kwargs['comment_proto'] = ""286 else:287 kwargs['proto'] = ""288 kwargs['comment_proto'] = "#"289 if self.gatling:290 kwargs['bind'] = 'bind={ ' + self.gatling + ' }'291 else:292 kwargs['bind'] = ''293 kwargs['ip'] = self.resolved_ip294 kwargs['port'] = self.port295 kwargs['timeout'] = self.timeout296 kwargs['instances'] = self.instances297 tune = ''298 if self.phantom_http_entity:299 tune += "entity = " + self.phantom_http_entity + "\n"300 if self.phantom_http_field:301 tune += "field = " + self.phantom_http_field + "\n"302 if self.phantom_http_field_num:303 tune += "field_num = " + self.phantom_http_field_num + "\n"304 if self.phantom_http_line:305 tune += "line = " + self.phantom_http_line + "\n"306 if tune:307 kwargs['reply_limits'] = 'reply_limits = {\n' + tune + "}"308 else:309 kwargs['reply_limits'] = ''310 if self.section == PhantomConfig.SECTION:311 fname = 'phantom_benchmark_main.tpl'312 else:313 fname = 'phantom_benchmark_additional.tpl'314 template_str = template_str = resource_string(315 __name__, "config/" + fname)316 tpl = string.Template(template_str)317 config = tpl.substitute(kwargs)318 return config...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!