Best Python code snippet using tempest_python
RC.py
Source:RC.py
...10class Resource:11 def __init__(self):12 global open_state13 open_state = False14 self.show_resource()1516 def add_resource(self,dict):17 self.show_resource()18 # self.json_file is updated19 self.resources_list.append(dict)20 # print(self.resources_list)21 # threadlock.acquire()22 global open_state23 if not open_state:24 open_state = True25 with open('logfile.json', 'w') as logfile:26 json.dump(self.json_file, logfile)27 logfile.close()28 open_state = False29 # threadlock.release()3031 def show_resource(self):32 global open_state33 try:34 if not open_state:35 open_state = True36 with open('logfile.json', 'r') as logfile:37 self.json_file = json.load(logfile)38 logfile.close()39 open_state = False40 self.resources_list = self.json_file["list_of_RCs"]41 except:42 reset_str = {"outer_part": "hello", "list_of_RCs": [],"delete_duration":25}43 # print(type(reset_str))44 print('error in config file now resetting it...')45 # threadlock.acquire()46 open_state = True47 with open('logfile.json', 'w') as logfile:48 self.json_file = json.dump(reset_str, logfile)49 logfile.close()50 open_state = False51 # threadlock.release()525354 def update_resource(self,dict):55 self.show_resource()56 global open_state57 # get the list of resources from the json-formatted log file58 for R in self.resources_list:59 if dict['resource_name']==R['resource_name']:60 # print('\n\n found it\n \n')61 RC_list_index =self.resources_list.index(R)62 R = dict63 R['Updated'] = ((time.time()))64 self.resources_list[RC_list_index]=dict65 # threadlock.acquire()66 if not open_state:67 open_state = True68 with open('logfile.json', 'w') as logfile:69 json.dump(self.json_file, logfile)70 logfile.close()71 open_state = False72 # threadlock.release()73 def delete_resource(self,name):74 # get data from DEL request body75 global open_state76 self.show_resource()77 for R in self.resources_list:78 # removing only the first name that match79 if R['resource_name'] == name:80 # print(resources_list)81 self.resources_list.remove(R)82 # print(resources_list)83 # backing things up84 # threadlock.acquire()8586 if not open_state:87 open_state = True88 with open('logfile.json', 'w') as logfile:89 json.dump(self.json_file, logfile)90 logfile.close()91 open_state = False92 # threadlock.release()939495class Resource_cat:96 exposed = True9798 def __init__(self):99 self.CAT = Resource()100101 def GET(self):102 # self.CAT.json_file will be updated103 self.CAT.show_resource()104 # print(json_file)105 return json.dumps(self.CAT.json_file)106 # maybe implement a filter using URI (future work)107108 def POST(self):109 # get the json to be added from POST request body110 to_add= cherrypy.request.body.read()111 data=to_add.decode('utf-8')112 dict = json.loads(data)113 #updating time114 dict['Updated'] = (time.time())115 #adding the new resource to the log file116 self.CAT.add_resource(dict)117 # returning the new updated resource catalog json file118 self.CAT.json_file = self.CAT.show_resource()119 return json.dumps(self.CAT.json_file)120121122 def PUT(self):123 #get data from PUT request body124 to_edit = cherrypy.request.body.read()125 data = to_edit.decode('utf-8')126 dict = json.loads(data)127128 self.CAT.update_resource(dict)129 self.CATjson_file = self.CAT.show_resource()130 return json.dumps(self.CAT.json_file)131132 def DELETE(self,*uri):133 #get data from DEL request body134 self.CAT.show_resource()135 name_to_be_removed = uri[0]136 self.CAT.delete_resource(name_to_be_removed)137 self.CAT.show_resource()138 return json.dumps(self.CAT.json_file)139140141class sc_registration_thread(threading.Thread):142143 def __init__(self, thread_ID):144 threading.Thread.__init__(self)145 self.thread_ID = thread_ID146 with open('initialization.json') as file:147 self.dicto = json.load(file)148 # print(dict)149 file.close()150151 def run(self):
...
RC_local.py
Source:RC_local.py
...10class Resource:11 def __init__(self):12 global open_state13 open_state = False14 self.show_resource()1516 def add_resource(self,dict):17 self.show_resource()18 # self.json_file is updated19 self.resources_list.append(dict)20 # print(self.resources_list)21 # threadlock.acquire()22 global open_state23 if not open_state:24 open_state = True25 with open('logfile.json', 'w') as logfile:26 json.dump(self.json_file, logfile)27 logfile.close()28 open_state = False29 # threadlock.release()3031 def show_resource(self):32 global open_state33 try:34 if not open_state:35 open_state = True36 with open('logfile.json', 'r') as logfile:37 self.json_file = json.load(logfile)38 logfile.close()39 open_state = False40 self.resources_list = self.json_file["list_of_RCs"]41 except:42 reset_str = {"outer_part": "hello", "list_of_RCs": [],"delete_duration":25}43 # print(type(reset_str))44 print('error in config file now resetting it...')45 # threadlock.acquire()46 open_state = True47 with open('logfile.json', 'w') as logfile:48 self.json_file = json.dump(reset_str, logfile)49 logfile.close()50 open_state = False51 # threadlock.release()525354 def update_resource(self,dict):55 self.show_resource()56 global open_state57 # get the list of resources from the json-formatted log file58 for R in self.resources_list:59 if dict['resource_name']==R['resource_name']:60 # print('\n\n found it\n \n')61 RC_list_index =self.resources_list.index(R)62 R = dict63 R['Updated'] = ((time.time()))64 self.resources_list[RC_list_index]=dict65 # threadlock.acquire()66 if not open_state:67 open_state = True68 with open('logfile.json', 'w') as logfile:69 json.dump(self.json_file, logfile)70 logfile.close()71 open_state = False72 # threadlock.release()73 def delete_resource(self,name):74 # get data from DEL request body75 global open_state76 self.show_resource()77 for R in self.resources_list:78 # removing only the first name that match79 if R['resource_name'] == name:80 # print(resources_list)81 self.resources_list.remove(R)82 # print(resources_list)83 # backing things up84 # threadlock.acquire()8586 if not open_state:87 open_state = True88 with open('logfile.json', 'w') as logfile:89 json.dump(self.json_file, logfile)90 logfile.close()91 open_state = False92 # threadlock.release()939495class Resource_cat:96 exposed = True9798 def __init__(self):99 self.CAT = Resource()100101 def GET(self):102 # self.CAT.json_file will be updated103 self.CAT.show_resource()104 # print(json_file)105 return json.dumps(self.CAT.json_file)106 # maybe implement a filter using URI (future work)107108 def POST(self):109 # get the json to be added from POST request body110 to_add= cherrypy.request.body.read()111 data=to_add.decode('utf-8')112 dict = json.loads(data)113 #updating time114 dict['Updated'] = (time.time())115 #adding the new resource to the log file116 self.CAT.add_resource(dict)117 # returning the new updated resource catalog json file118 self.CAT.json_file = self.CAT.show_resource()119 return json.dumps(self.CAT.json_file)120121122 def PUT(self):123 #get data from PUT request body124 to_edit = cherrypy.request.body.read()125 data = to_edit.decode('utf-8')126 dict = json.loads(data)127128 self.CAT.update_resource(dict)129 self.CATjson_file = self.CAT.show_resource()130 return json.dumps(self.CAT.json_file)131132 def DELETE(self,*uri):133 #get data from DEL request body134 self.CAT.show_resource()135 name_to_be_removed = uri[0]136 self.CAT.delete_resource(name_to_be_removed)137 self.CAT.show_resource()138 return json.dumps(self.CAT.json_file)139140141class sc_registration_thread(threading.Thread):142143 def __init__(self, thread_ID):144 threading.Thread.__init__(self)145 self.thread_ID = thread_ID146 def run(self):147 # registering and updating of the registration148 # url = 'http://linksmart:8082/' # when using a docker container149 url = 'http://localhost:8082/'150 reg.registration('Resource_CATALOG.json', url,'RC')151
...
yyets.py
Source:yyets.py
...5# YYeTs API6# Python 2 & 37__author__ = 'Benny <benny@bennythink.com>'8import requests9def show_resource(resource_id):10 """11 show resource information.12 :param resource_id: unique id for each resource, get from `search_resource`13 :return: download link in json14 """15 url = 'http://pc.zmzapi.com/index.php?g=api/pv2&m=index&a=resource&accesskey=519f9cab85c8059d17544947k361a827&id='16 r = requests.get(url + resource_id)17 return r.json()18def search_resource(name):19 """20 search resource21 :param name: tv/movie name22 :return: the whole search result23 """24 url = 'http://pc.zmzapi.com/index.php?g=api/pv2&m=index&a=search&accesskey=519f9cab85c8059d17544947k361a827&limit=200&k='25 r = requests.get(url + name)26 return r.json()27def query_resource(user_raw_message):28 """29 query resource name30 :param user_raw_message: `/query batman`31 :return: message to be sent/32 """33 name = user_raw_message[6:].lstrip(' ').rstrip(' ')34 search_result = search_resource(name)35 msg = ''36 s = search_result.get('data')37 if s is None:38 return '没æ对åºçèµæºï¼ä½¿ç¨æ¹æ³/query éé¿å¯è»å´æç¨'39 elif len(s) > 1:40 for key in range(len(s)):41 msg = msg + ('%d. %s %s' % (key + 1, s[key].get('channel'), s[key].get('title'))) + '\n'42 return msg43def get_season_count(tv_name):44 """45 if it was movie, get download link whenever possible; if it was tv, get seasons count46 :param tv_name: name, such as `avatar`, `Black Mirror`47 :return: download link, `0, err_msg` or `season count, dl link`48 """49 search_result = search_resource(tv_name)50 # return the first result!51 if search_result.get('data') is None or search_result.get('data') == u'':52 return 0, '没æè¿ä¸ªèµæº'53 elif len(search_result.get('data')) > 1:54 search_result.update(data=[search_result.get('data')[0]])55 if search_result.get('data')[0].get('channel') == 'movie':56 # TODO: AccessKey 4002 èµæºå
³é 4003ææ¶æ²¡æèµæº57 download_link = show_resource(search_result.get('data')[0].get('id'))58 return (0, 'ç±äºaccesskeyçåå ï¼çµå½±æç´¢åºæ¬æ¯åºçï¼æ£å¨å°è¯ä¿®å¤ä¸â¦â¦') if download_link.get(59 'status') != 1 else (255, get_movie_link(download_link))60 else:61 download_link = show_resource(search_result.get('data')[0].get('id'))62 if download_link.get('status') != 1:63 return 0, 'èµæºä¸å¯ç¨'64 season = download_link.get('data').get('list')[0].get('season')65 return (1, download_link) if season == '101' else (int(season), download_link)66def get_movie_link(data):67 """68 get movie link69 :param data: link generate by `show_resource`70 :return: pass two params to iter_link to get actual link71 """72 dl_list = data.get('data').get('list')[0].get('episodes')[0]73 dl_hr_mp4 = dl_list.get('files').get('HR-HDTV') or dl_list.get('files').get('MP4')74 dl_app = dl_list.get('files').get('APP')75 return iter_link(dl_app, dl_hr_mp4)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!