How to use make_result method in stestr

Best Python code snippet using stestr_python

test_models.py

Source:test_models.py Github

copy

Full Screen

...21 self.env = Environment.objects.get(name='Walden Pond')22 self.exe = Executable.objects.get(name='walden')23 self.bench = Benchmark.objects.get(name='TestBench')24 def test_average_change_bad(self):25 self.make_result(12)26 s2 = self.make_result(15)27 rep = self.make_report(s2)28 self.assertEqual(rep.colorcode, 'red')29 def test_average_change_good(self):30 self.make_result(15)31 s2 = self.make_result(12)32 rep = self.make_report(s2)33 self.assertEqual(rep.colorcode, 'green')34 def test_within_threshold_none(self):35 self.make_result(15)36 s2 = self.make_result(15.2)37 rep = self.make_report(s2)38 self.assertEqual(rep.colorcode, 'none')39 def test_initial_revision_none(self):40 s2 = self.make_result(15)41 rep = self.make_report(s2)42 self.assertEqual(rep.colorcode, 'none')43 def test_bench_change_good(self):44 b1 = self.make_bench('b1')45 s1 = self.make_result(15)46 self.make_result(15, rev=s1, benchmark=b1)47 s2 = self.make_result(14.54)48 self.make_result(15, rev=s2, benchmark=b1)49 rep = self.make_report(s2)50 self.assertEqual(rep.colorcode, 'green')51 self.assertIn(self.bench.name, rep.summary)52 def test_bench_change_bad(self):53 b1 = self.make_bench('b1')54 s1 = self.make_result(15)55 self.make_result(15, rev=s1, benchmark=b1)56 s2 = self.make_result(15.46)57 self.make_result(15, rev=s2, benchmark=b1)58 rep = self.make_report(s2)59 self.assertEqual(rep.colorcode, 'red')60 self.assertIn(self.bench.name, rep.summary)61 # NOTE: Don't need to test with multiple projects since the calculation of62 # urgency doesn't take projects into account63 def test_average_change_beats_bench_change(self):64 b1 = self.make_bench('b1')65 s1 = self.make_result(15)66 self.make_result(15, rev=s1, benchmark=b1)67 s2 = self.make_result(14)68 self.make_result(15, rev=s2, benchmark=b1)69 rep = self.make_report(s2)70 self.assertIn('Average', rep.summary)71 def test_good_benchmark_change_beats_bad_average_trend(self):72 changes = self.make_bad_trend()73 b1 = self.make_bench('b1')74 for x in changes:75 s1 = self.make_result(x)76 if x != changes[-1]:77 self.make_result(x, rev=s1, benchmark=b1)78 self.make_result(changes[-2] * .97, rev=s1, benchmark=b1)79 rep = self.make_report(s1)80 self.assertEquals('green', rep.colorcode)81 self.assertIn('b1', rep.summary)82 def test_good_average_change_beats_bad_average_trend(self):83 changes = self.make_bad_trend()84 b1 = self.make_bench('b1')85 for x in changes:86 s1 = self.make_result(x)87 if x != changes[-1]:88 self.make_result(x, rev=s1, benchmark=b1)89 self.make_result(changes[-2] * .92, rev=s1, benchmark=b1)90 rep = self.make_report(s1)91 self.assertEquals('green', rep.colorcode)92 self.assertIn('Average', rep.summary)93 def test_good_change_beats_good_trend(self):94 changes = self.make_good_trend()95 b1 = self.make_bench('b1')96 for x in changes:97 s1 = self.make_result(x)98 if x != changes[-1]:99 self.make_result(x, rev=s1, benchmark=b1)100 self.make_result(changes[-2] * .95, rev=s1, benchmark=b1)101 rep = self.make_report(s1)102 self.assertIn('b1', rep.summary)103 self.assertNotIn('trend', rep.summary)104 def test_bad_trend_beats_good_trend(self):105 good_changes = self.make_good_trend()106 bad_changes = self.make_bad_trend()107 b1 = self.make_bench('b1')108 for i in range(len(good_changes)):109 s1 = self.make_result(good_changes[i])110 self.make_result(bad_changes[i], rev=s1, benchmark=b1)111 rep = self.make_report(s1)112 self.assertIn('trend', rep.summary)113 self.assertIn('b1', rep.summary)114 self.assertIn('yellow', rep.colorcode)115 def test_bad_change_beats_good_trend(self):116 changes = self.make_good_trend()117 b1 = self.make_bench('b1')118 for x in changes:119 s1 = self.make_result(x)120 if x != changes[-1]:121 self.make_result(x, rev=s1, benchmark=b1)122 self.make_result(changes[-2] * 1.05, rev=s1, benchmark=b1)123 rep = self.make_report(s1)124 self.assertIn('b1', rep.summary)125 self.assertNotIn('trend', rep.summary)126 self.assertEquals('red', rep.colorcode)127 def test_bad_beats_good_change(self):128 b1 = self.make_bench('b1')129 s1 = self.make_result(12)130 self.make_result(12, rev=s1, benchmark=b1)131 s2 = self.make_result(15)132 self.make_result(9, rev=s2, benchmark=b1)133 rep = self.make_report(s2)134 self.assertEqual(rep.colorcode, 'red')135 def test_bigger_bad_beats_smaller_bad(self):136 b1 = self.make_bench('b1')137 b2 = self.make_bench('b2')138 s1 = self.make_result(1.0)139 self.make_result(1.0, rev=s1, benchmark=b1)140 self.make_result(1.0, rev=s1, benchmark=b2)141 s2 = self.make_result(1.0)142 self.make_result(1.04, rev=s2, benchmark=b1)143 self.make_result(1.03, rev=s2, benchmark=b2)144 rep = self.make_report(s2)145 self.assertIn('b1', rep.summary)146 self.assertEquals('red', rep.colorcode)147 def test_multiple_quantities(self):148 b1 = self.make_bench('b1', quantity='Space', units='bytes')149 s1 = self.make_result(1.0)150 self.make_result(1.0, rev=s1, benchmark=b1)151 s2 = self.make_result(1.4)152 self.make_result(1.5, rev=s2, benchmark=b1)153 rep = self.make_report(s2)154 self.assertRegexpMatches(rep.summary, '[sS]pace')155 self.assertEquals('red', rep.colorcode)156 def make_result(self, value, rev=None, benchmark=None):157 from uuid import uuid4158 if not benchmark:159 benchmark = self.bench160 if not rev:161 commitdate = self.starttime + timedelta(days=self.days)162 cid = str(uuid4())163 Revision(commitid=cid, date=commitdate, branch=self.b,164 project=self.pro).save()165 rev = Revision.objects.get(commitid=cid)166 Result(value=value, revision=rev, executable=self.exe,167 environment=self.env, benchmark=benchmark).save()168 self.days += 1169 return rev170 def make_report(self, revision):...

Full Screen

Full Screen

ftp_event.py

Source:ftp_event.py Github

copy

Full Screen

...11 @make_decorator12 def verify_get_ftp_event_list(event_name, status, page, limit):13 """获取文件事件列表"""14 if status < 0 or status > 3:15 abort(400, **make_result(status=400, msg='调度状态参数错误'))16 return Response(event_name=event_name, status=status, page=page, limit=limit)17 @staticmethod18 @make_decorator19 def verify_get_ftp_event_detail(ftp_event_id):20 """获取文件事件详情"""21 if not ftp_event_id:22 abort(400, **make_result(status=400, msg='调度id不存在'))23 return Response(ftp_event_id=ftp_event_id)24 @staticmethod25 @make_decorator26 def verify_update_ftp_event_detail(ftp_event_id, event_name, event_desc, ftp_id, data_path, file_name, interface_id,27 start_time, end_time, date_time, interval_value, old_status, new_status,28 user_id):29 """修改文件事件详情"""30 if not ftp_event_id:31 abort(400, **make_result(status=400, msg='事件id不存在'))32 if not event_name:33 abort(400, **make_result(status=400, msg='事件名称不存在'))34 if not ftp_id:35 abort(400, **make_result(status=400, msg='FTP配置id不存在'))36 if not data_path:37 abort(400, **make_result(status=400, msg='文件路径不存在'))38 if not file_name:39 abort(400, **make_result(status=400, msg='文件名称不存在'))40 if not interface_id:41 abort(400, **make_result(status=400, msg='任务流id不存在'))42 if not interval_value:43 abort(400, **make_result(status=400, msg='间隔值不得为空'))44 if interval_value < 1 or interval_value > 60:45 abort(400, **make_result(status=400, msg='间隔值应在1-60分钟之间'))46 if not date_time:47 now = time.strftime('%Y-%m-%d', time.localtime())48 date_time = date_add(now, -1)49 if start_time < 0 or start_time > 23:50 abort(400, **make_result(status=400, msg='开始时间错误'))51 if end_time < 1 or end_time > 24:52 abort(400, **make_result(status=400, msg='结束时间错误'))53 if end_time <= start_time:54 abort(400, **make_result(status=400, msg='结束时间不得小于等于开始时间'))55 if old_status < 0 or old_status > 2:56 abort(400, **make_result(status=400, msg='是否失效参数错误(旧)'))57 if new_status < 0 or new_status > 2:58 abort(400, **make_result(status=400, msg='是否失效参数错误(新)'))59 if not user_id:60 abort(400, **make_result(status=400, msg='用户不存在'))61 return Response(ftp_event_id=ftp_event_id, event_name=event_name, event_desc=event_desc, ftp_id=ftp_id,62 data_path=data_path, file_name=file_name, interface_id=interface_id,63 interval_value=interval_value, start_time=start_time, end_time=end_time, old_status=old_status,64 date_time=date_time, new_status=new_status, user_id=user_id)65 @staticmethod66 @make_decorator67 def verify_add_ftp_event_detail(event_name, event_desc, ftp_id, data_path, file_name, interface_id, start_time,68 end_time, date_time, interval_value, user_id):69 """新增文件事件详情"""70 if not event_name:71 abort(400, **make_result(status=400, msg='事件名称不存在'))72 if not ftp_id:73 abort(400, **make_result(status=400, msg='FTP配置id不存在'))74 if not data_path:75 abort(400, **make_result(status=400, msg='文件路径不存在'))76 if not file_name:77 abort(400, **make_result(status=400, msg='文件名称不存在'))78 if not interface_id:79 abort(400, **make_result(status=400, msg='任务流id不存在'))80 if not interval_value:81 abort(400, **make_result(status=400, msg='间隔值不得为空'))82 if interval_value < 1 or interval_value > 60:83 abort(400, **make_result(status=400, msg='间隔值应在1-60分钟之间'))84 if not date_time:85 now = time.strftime('%Y-%m-%d', time.localtime())86 date_time = date_add(now, -1)87 if start_time < 0 or start_time > 23:88 abort(400, **make_result(status=400, msg='开始时间错误'))89 if end_time < 1 or end_time > 24:90 abort(400, **make_result(status=400, msg='结束时间错误'))91 if end_time <= start_time:92 abort(400, **make_result(status=400, msg='结束时间不得小于等于开始时间'))93 if not user_id:94 abort(400, **make_result(status=400, msg='用户不存在'))95 return Response(event_name=event_name, event_desc=event_desc, ftp_id=ftp_id, data_path=data_path,96 file_name=file_name, interface_id=interface_id, interval_value=interval_value,97 start_time=start_time, end_time=end_time, date_time=date_time, user_id=user_id)98 @staticmethod99 @make_decorator100 def verify_test_ftp_event_link(ftp_id, ftp_type, ftp_host, ftp_port, ftp_user, ftp_passwd, data_path):101 """测试FTP文件目录是否存在"""102 if not data_path:103 abort(400, **make_result(status=400, msg='文件路径不得为空'))104 if not ftp_id and (not ftp_type and not ftp_host and not ftp_port):105 abort(400, **make_result(status=400, msg='FTP配置项缺失'))106 return Response(ftp_id=ftp_id, ftp_type=ftp_type, ftp_host=ftp_host, ftp_port=ftp_port, ftp_user=ftp_user,107 ftp_passwd=ftp_passwd, data_path=data_path)108 @staticmethod109 @make_decorator110 def verify_action_ftp_event(ftp_event_id, action, user_id):111 """暂停/恢复调度事件"""112 if not ftp_event_id:113 abort(400, **make_result(status=400, msg='调度事件id不存在'))114 if action < 1 or action > 2:115 abort(400, **make_result(status=400, msg='请求参数错误'))116 return Response(ftp_event_id=ftp_event_id, action=action, user_id=user_id)117 @staticmethod118 @make_decorator119 def verify_delete_ftp_event(ftp_event_id, user_id):120 """删除调度详情"""121 if not ftp_event_id:122 abort(400, **make_result(status=400, msg='调度事件id不存在'))123 return Response(ftp_event_id=ftp_event_id, user_id=user_id)124 @staticmethod125 @make_decorator126 def verify_run_ftp_event(ftp_event_id, run_date, date_format):127 """立即执行调度事件"""128 if not ftp_event_id:129 abort(400, **make_result(status=400, msg='调度id不存在'))...

Full Screen

Full Screen

view.py

Source:view.py Github

copy

Full Screen

...25 if stu:26 result = []27 for s in stu.items:28 result.append(s.to_json())29 return make_result(data=result)30 else:31 return make_result(code=Code.FAIL)32 except:33 return make_result(code=Code.DATA_FAIL)34 else:35 try:36 students = Student.query.filter(Student.is_del == 0).order_by(Student.id).paginate(page=page,37 per_page=num)38 if students:39 student = []40 for s1 in students.items:41 student.append(s1.to_json())42 return make_result(data=student)43 else:44 return make_result(code=Code.NOT_DATA)45 except:46 return make_result(code=Code.DATA_FAIL)47class AddStudent(Resource):48 def __init__(self):49 self.reqparse = reqparse.RequestParser()50 self.reqparse.add_argument('name', type=str, required=True, help='name缺失!')51 self.reqparse.add_argument('sex', type=int, required=True, help='sex缺失!')52 self.reqparse.add_argument('age', type=int, required=True, help='age缺失!')53 self.reqparse.add_argument('grade', type=int, required=True, help='grade缺失!')54 self.reqparse.add_argument('source', type=int, required=True, help='source缺失!')55 self.reqparse.add_argument('face', type=str, required=True, help='face缺失!')56 self.args = self.reqparse.parse_args()57 def post(self):58 name = self.args.get('name', None)59 sex = self.args.get('sex', None)60 age = self.args.get('age', None)61 grade = self.args.get('grade', None)62 source = self.args.get('source', None)63 face = self.args.get('face', None)64 # 根据提交的数据添加用户,默认标记是否删除位为0。(0:未删除,1:已删除)65 stu = Student(name=name, sex=sex, age=age, grade=grade, source=source, face=face)66 if name and sex and age and grade and source and face:67 try:68 db.session.add(stu)69 db.session.commit()70 return make_result()71 except:72 return make_result(code=Code.DATA_FAIL)73 else:74 return make_result(code=Code.PARAM_FAIL)75class Upload(Resource):76 def __init__(self):77 self.reqparse = reqparse.RequestParser()78 self.reqparse.add_argument('file', type=FileStorage, location='files')79 self.args = self.reqparse.parse_args()80 def post(self):81 # 上传文件,并保存到file文件夹下。返回重命名的文件名,以供添加接口face参数调用。82 file = self.args.get('file', None)83 file_name = f'{int(round(time.time()*1000))}.png'84 if file:85 file.save(f'./file/{file_name}')86 return make_result(data={'face': file_name})87 else:88 return make_result(code=Code.FILE_NULL)89class DelStudent(Resource):90 def __init__(self):91 self.reqparse = reqparse.RequestParser()92 self.reqparse.add_argument('name', type=str, required=True, help='name缺失!')93 self.args = self.reqparse.parse_args()94 def delete(self):95 name = self.args.get('name', None)96 # 根据名字查询出标志为0的数据,然后再把标志更改为1.(0:未删除,1:已删除)97 if name:98 stu = Student.query.filter(and_(Student.name == name, Student.is_del == 0)).first()99 if stu:100 stu.is_del = 1101 try:102 db.session.commit()103 return make_result()104 except:105 return make_result(code=Code.DATA_FAIL)106 else:107 return make_result(code=Code.NOT_DATA)108 else:109 return make_result(code=Code.PARAM_FAIL)110class UpdateStudent(Resource):111 def __init__(self):112 self.reqparse = reqparse.RequestParser()113 self.reqparse.add_argument('id', type=int, required=True, help='id缺失!')114 self.reqparse.add_argument('name', type=str)115 self.reqparse.add_argument('sex', type=int)116 self.reqparse.add_argument('age', type=int)117 self.reqparse.add_argument('grade', type=int)118 self.reqparse.add_argument('source', type=int)119 self.reqparse.add_argument('face', type=str)120 self.args = self.reqparse.parse_args()121 def post(self):122 id = self.args.get('id', None)123 name = self.args.get('name', None)124 sex = self.args.get('sex', None)125 age = self.args.get('age', None)126 grade = self.args.get('grade', None)127 source = self.args.get('source', None)128 face = self.args.get('face', None)129 # 根据提交的ID查询出数据,然后再根据提交的数据修改相应的字段,最后提交给数据库。130 stu = Student.query.filter(Student.id == id).first()131 if stu:132 if name or sex or age or grade or source or face:133 if name:134 stu.name = name135 if sex:136 stu.sex = sex137 if age:138 stu.age = age139 if grade:140 stu.grade = grade141 if source:142 stu.source = source143 if face:144 stu.face = face145 db.session.add(stu)146 try:147 db.session.commit()148 return make_result()149 except:150 return make_result(code=Code.DATA_FAIL)151 else:152 return make_result(code=Code.FAIL)153 else:...

Full Screen

Full Screen

RTM_statis_new.py

Source:RTM_statis_new.py Github

copy

Full Screen

...11 zapi = None12 url = 'http://localhost:51081/api_jsonrpc.php'13 user = 'Admin'14 password = 'NokiaRtm'15 def make_result(self, resultcode, result):16 return { 'resultcode': resultcode, 'result': result }17 def __init__(self):18 self.zapi = None19 def rtm_conn_win(self, url, user, password):20 try:21 self.url = url22 self.user = user23 self.password = password24 #self.zapi = ZabbixAPI(url=url, user=user, password=password)25 #print('rtm_conn: ', url, user, password)26 self.zapi = ZabbixAPI(url)27 self.zapi.login(user, password)28 return self.make_result(1, "zabbix initialize successfully")29 except Exception as e:30 return self.make_result(0, e)31 32 def rtm_conn_linux(self, url, user, password):33 try:34 self.url = url35 self.user = user36 self.password = password37 #self.zapi = ZabbixAPI(url=url, user=user, password=password)38 #print('rtm_conn: ', url, user, password)39 self.zapi = ZabbixAPI(url)40 self.zapi.login(user, password)41 return self.make_result(1, "zabbix initialize successfully")42 except Exception as e:43 return self.make_result(0, e)44 def rtm_get_host(self, hostname):45 try:46 apiresult = self.zapi.do_request('host.get', {47 'filter': {48 'host': [hostname]49 }50 })51 if (apiresult and len(apiresult['result'])>0):52 return self.make_result(1, apiresult['result'][0]['hostid'])53 else:54 return self.make_result(0, 'host query return null')55 except Exception as e:56 return self.make_result(0, e)57 def rtm_get_item(self, itemname, hostid):58 try:59 apiresult = self.zapi.do_request('item.get', {60 'output': 'extend',61 'hostids': hostid,62 'search': {63 'name': itemname64 }65 })66 if (apiresult and len(apiresult['result'])>0):67 return self.make_result(1, apiresult['result'][0]['itemid'])68 else:69 return self.make_result(0, 'item query return null')70 except Exception as e:71 return self.make_result(0, e)72 def rtm_get_trend(self, itemid, starttime, stoptime):73 try:74 apiresult = self.zapi.do_request('trend.get', {75 'output': [76 "itemid",77 "clock",78 "num",79 "value_min",80 "value_avg",81 "value_max",82 ],83 'itemids': [itemid],84 'time_from': starttime,85 'time_till': stoptime86 })87 if (apiresult and len(apiresult['result'])>0):88 return self.make_result(1, apiresult['result'])89 else:90 return self.make_result(0, 'trend query return null')91 except Exception as e:92 return self.make_result(0, e)93 def rtm_get_value(self, hostnames, itemnames, starttime, stoptime):94 if (self.zapi == None):95 connresult = self.rtm_conn(url, user, password)96 if (connresult['resultcode'] == 1):97 self.zapi = connresult['result']98 else:99 return self.make_result(0, 'zabbix api connect fail')100 if type(hostnames) != list:101 hostnames = [hostnames]102 if type(itemnames) != list:103 itemnames = [itemnames]104 logging.info('starttime: ' + str(starttime))105 logging.info('stoptime: ' + str(stoptime))106 107 return_result = []108 for hostname in hostnames: 109 host_result = self.rtm_get_host(hostname)110 if (host_result['resultcode'] == 0):111 return self.make_result(0, 'hostname %s can not be found : %s' % (hostname, host_result['result']))112 hostid = host_result['result']113 logging.info('hostid find: ' + hostid)114 for itemname in itemnames:115 item_result = self.rtm_get_item(itemname, hostid)116 if (item_result['resultcode'] == 0):117 return self.make_result(0, 'itemname %s can not be found : %s' % (itemname, item_result['result']))118 itemid = item_result['result']119 logging.info('itemid find: ' + itemid)120 121 trend_result = self.rtm_get_trend(itemid, starttime, stoptime)122 logging.info('trend_result: ' , trend_result)123 if trend_result['resultcode'] != 0 and type(trend_result['result']) == list:124 for trend_result_item in trend_result['result']:125 trend_result_item['hostname'] = hostname126 trend_result_item['itemname'] = itemname127 return_result.append(trend_result_item)128 logging.info('rtm_get_value run over') 129 if len(return_result)>0:130 return self.make_result(1, return_result)131 else:132 return self.make_result(0, return_result)133if __name__ == '__main__':134 stopdatetime = datetime.datetime.now()135 startdatetime = stopdatetime + datetime.timedelta(hours = -1)136 #stoptime = int(time.mktime(stopdatetime.timetuple()))137 #starttime = int(time.mktime(startdatetime.timetuple()))138 stoptime = 1533171300 139 starttime = 1533162700140 url = 'http://localhost:51081/api_jsonrpc.php'141 user = 'Admin'142 password = 'NokiaRtm'143 rtm_statis = RTM_Statis()144 api_result = rtm_statis.rtm_conn(url=url, user=user, password=password)145 if (api_result['resultcode'] == 0):146 logging.info('Zabbix API init failed: %s' % api_result['result'])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run stestr automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful