Best Python code snippet using slash
libarubacentral.py
Source:libarubacentral.py
...200 group: str = None, client_count: bool = None, label: str = None, swarm_id=None, mac_address=None,201 timeout=None):202 url = "/monitoring/v1/aps"203 if limit:204 url = self._add_arg(url, f"limit={str(limit)}")205 if status:206 url = self._add_arg(url, f"status={status}")207 if swarm_id:208 url = self._add_arg(url, f"swarm_id={swarm_id}")209 elif vc:210 url = self._add_arg(url, f"swarm_id={self.get_swarm_id(vc, access_token=access_token)}")211 if group:212 url = self._add_arg(url, f"group={group}")213 if client_count:214 url = self._add_arg(url, f"calculate_client_count=true")215 if label:216 url = self._add_arg(url, f"label={label}")217 if mac_address:218 url = self._add_arg(url, f"macaddr={mac_address}")219 logging.debug(f"getting aps: {url}")220 return self._get_api(url, access_token, timeout=timeout)['aps']221 def get_swarm_id(self, name: str, access_token: dict=None) -> str:222 data = self._get_api('/monitoring/v1/swarms', access_token)223 for swarm in data['swarms']:224 if name.lower() in swarm['name'].lower():225 return swarm['swarm_id']226 raise RuntimeError(f"No swarm found with name {name}")227 def get_swarms(self, access_token:dict=None, group=None) -> list:228 url = '/monitoring/v1/swarms'229 if group:230 url = self._add_arg(url, f"group={group}")231 return self._get_api(url, access_token)['swarms']232 def get_ap(self, serial, access_token: dict = None):233 data = self._get_api(f'/monitoring/v1/aps/{serial}', access_token)234 return data235 def _get_api(self, url, timeout=None, access_token: dict=None) -> dict:236 if not timeout and self.http_timeout:237 timeout = self.http_timeout238 if not access_token:239 self.authenticate()240 access_token = self.access_token241 api_data = dict()242 this_access_token = access_token['access_token']243 token_url = self.cfgdata['url'] + url244 headers = {245 'Accept': 'application/json',246 'Content-Type': 'application/json',247 'Authorization': 'Bearer ' + this_access_token248 }249 data = {}250 r = requests.get(token_url, headers=headers, data=json.dumps(data), verify=True, timeout=timeout)251 if r.status_code == 200:252 api_data = json.loads(r.text)253 else:254 raise RuntimeError(f"STATUS CODE: {str(r.status_code)} \nDetail: {str(r.text)}")255 return api_data256 def _post_api(self, url, data: dict, access_token: dict = None, timeout=None) -> dict:257 if not timeout:258 timeout = self.http_timeout259 if not access_token:260 self.authenticate()261 access_token = self.access_token262 api_data = dict()263 this_access_token = access_token['access_token']264 request_url = self.cfgdata['url'] + url265 headers = {266 'Accept': 'application/json',267 'Content-Type': 'application/json',268 'Authorization': 'Bearer ' + this_access_token269 }270 r = requests.post(request_url, headers=headers, data=json.dumps(data), verify=True, timeout=timeout)271 if r.status_code == 200:272 api_data = json.loads(r.text)273 else:274 raise RuntimeError(f"STATUS CODE: {str(r.status_code)} \nDetail: {str(r.text)}")275 return api_data276 @staticmethod277 def _add_arg(url:str, arg: str):278 if '?' not in url:279 url += '?'280 else:281 url += '&'282 url += arg283 return url284 def get_down_aps(self, vc = None, group = None, access_token=None, swarm_id = None):285 down_list = self.get_aps(status='Down', vc=vc, group=group, access_token=access_token, swarm_id=swarm_id)286 return down_list287 def get_client_count(self, vc = None, group=None, network=None, label=None, access_token=None, swarm_id=None,288 timeout=None):289 url = '/monitoring/v1/clients/count'290 if swarm_id:291 url = self._add_arg(url, f"swarm_id={swarm_id}")292 elif vc:293 url = self._add_arg(url, f"swarm_id={self.get_swarm_id(vc, access_token=access_token)}")294 if group:295 url = self._add_arg(url, f"group={group}")296 if label:297 url = self._add_arg(url, f"label={label}")298 if network:299 url = self._add_arg(url, f"network={network}")300 return self._get_api(url, access_token=access_token, timeout=timeout)['count']301 def get_wifi_clients(self, vc = None, group=None, network=None, label=None, access_token=None, count_only=False,302 limit=1000, band=None, offset=None, timeout=None):303 url = '/monitoring/v1/clients/wireless'304 if limit:305 url = self._add_arg(url, f"limit={str(limit)}")306 if vc:307 url = self._add_arg(url, f"swarm_id={self.get_swarm_id(vc, access_token=access_token)}")308 if group:309 url = self._add_arg(url, f"group={group}")310 if label:311 url = self._add_arg(url, f"label={label}")312 if network:313 url = self._add_arg(url, f"network={network}")314 if band:315 url = self._add_arg(url, f"band={band}")316 if offset:317 url = self._add_arg(url, f"offset={str(offset)}")318 url = self._add_arg(url, 'calculate_total=true')319 if count_only:320 return self._get_api(url, access_token=access_token, timeout=timeout)['count']321 else:322 return self._get_api(url, access_token=access_token, timeout=timeout)['clients']323 def get_networks(self, access_token: dict=None, group=None):324 url = '/monitoring/v1/networks'325 if group:326 url = self._add_arg(url, f"group={group}")327 return self._get_api(url, access_token=access_token)['networks']328 def get_vcs(self, access_token: dict=None, group=None):329 url = '/monitoring/v1/swarms'330 if group:331 url = self._add_arg(url, f"group={group}")332 return self._get_api(url, access_token=access_token)['swarms']333 def name_ap(self, name, serial=None, mac=None, access_token=None):334 if not serial:335 if not mac:336 raise RuntimeError("Need a Serial or Mac Address to rename an AP.")337 else:338 aps = self.get_aps(access_token=access_token,mac_address=mac)339 if len(aps) == 1:340 serial = aps[0]['serial']341 elif len(aps) > 1:342 for i in aps:343 if mac.lower().replace(' ','').replace('-','').replace(':','') == i['macaddr'].replace(';',''):344 serial = i['serial']345 else:...
ttserver.py
Source:ttserver.py
...241 self.cmds_allowed = cmds_allowed242 self.exec_cmd = exec_cmd243 def to_cmd(self):244 args = [self.exec_cmd]245 def _add_arg(attr, arg, flag=False):246 if attr is None:247 return248 arg = '-%s' % arg249 if flag:250 if attr:251 args.append(arg)252 else:253 args.extend((arg, str(attr)))254 _add_arg(self.hostname, 'host')255 _add_arg(self.port, 'port')256 _add_arg(self.numthreads, 'thnum')257 _add_arg(self.pidfile, 'pid')258 _add_arg(self.pidfile_kill, 'kl', True)259 _add_arg(self.ulog_path, 'ulog')260 _add_arg(self.ulog_limit, 'ulim')261 _add_arg(self.ulog_async, 'uas', True)262 _add_arg(self.repl_master_host, 'mhost')263 _add_arg(self.repl_master_port, 'mport')264 _add_arg(self.repl_ts_path, 'rts')265 _add_arg(self.lua_ext, 'ext')266 if self.lua_cron_cmd:267 _add_arg('%s %d' % (self.lua_cron_cmd, self.lua_cron_period), 'extpc')268 if self.cmds_forbidden:269 _add_arg(','.join(self.cmds_forbidden), 'mask')270 if self.cmds_allowed:271 _add_arg(','.join(self.cmds_allowed), 'unmask')272 if self.log_level > logging.INFO:273 _add_arg(True, 'le', True)274 elif self.log_level < logging.INFO:275 _add_arg(True, 'ld', True)276 args.append(self.db.to_cmd())277 return args278 def run(self, wait=False):279 pargs = self.to_cmd()280 log.debug('Starting process: %s' % ' '.join(pargs))281 self._proc = subprocess.Popen(pargs, stdout=self.stdout, stderr=self.stderr)282 if wait:283 self.wait()284 def run_wait(self):285 self.run(True)286 def wait(self):287 try:288 self._proc.wait()289 except KeyboardInterrupt:...
collect_stats_from_model.py
Source:collect_stats_from_model.py
...25 :param with_prefix: Add prefix (collect-stats).26 :param with_required: Add required statements where necessary.27 :return: The updated parser28 """29 def _add_arg(name, short_name, help_msg, **kwargs):30 if with_prefix:31 name_arg = "collect-stats-" + name32 short_name_arg = "cs" + short_name33 else:34 name_arg = name35 short_name_arg = short_name36 name_arg = "--" + name_arg37 if len(short_name_arg) > 1:38 short_name_arg = "--" + short_name_arg39 else:40 short_name_arg = "-" + short_name_arg41 required = False42 if "required" in kwargs:43 required = (required or kwargs["required"]) and with_required44 del kwargs["required"]45 parser.add_argument(name_arg, short_name_arg, help=help_msg, required=required, **kwargs)46 _add_arg("log-path", "l", "Path to the log output folder.", type=str, required=True)47 _add_arg("validation-set-path", "v", "Path to the validation set SMILES file.", type=str, required=True)48 _add_arg("sample-size", "n", "Number of SMILES to sample from the model. [DEFAULT: 10000]", type=int, default=10000)49 _add_arg("with-weights", "w", "Store the weight matrices each epoch [DEFAULT: False].",50 action="store_true", default=False)51 _add_arg("smiles-type", "st",52 "SMILES type to converto to TYPES=(smiles, deepsmiles.[branches|rings|both]) [DEFAULT: smiles]",53 type=str, default="smiles")54 return parser55def main():56 """Main function."""57 args = parse_args()58 model = mm.Model.load_from_file(args.model_path, mode="sampling")59 training_set = list(uc.read_smi_file(args.training_set_path))60 validation_set = list(uc.read_smi_file(args.validation_set_path))61 writer = tbx.SummaryWriter(log_dir=args.log_path)62 ma.CollectStatsFromModel(model, args.epoch, training_set, validation_set, writer,63 sample_size=args.sample_size, with_weights=args.with_weights,64 to_mol_func=uc.get_mol_func(args.smiles_type), logger=LOG).run()65 writer.close()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!