How to use results method in Slash

Best Python code snippet using slash

main.py

Source:main.py Github

copy

Full Screen

1#!/usr/bin/env python2# -*- coding: utf-8 -*-3# Name: main.py4# Author: xiooli <xioooli[at]yahoo.com.cn>5# Site: http://joolix.com6# Licence: GPLv37# Version: 0912168import web, sys, os, hashlib, zlib, re, time, pickle, base64, json9from settings import *10import libmymoldb as lm11from libmymoldb.functions import *12reload(sys)13sys.setdefaultencoding('utf-8')14# =====================================================BEGIN INITIALIZATION==============================================15# init the permission check function16is_permited = lambda user, user_group, actions = ('s',), involved_user = None, actions_rules = None: \17 permission_check(user, user_group, actions, involved_user, ACTIONS_RULES)18# init the web application19app = web.application(URL_MAP, globals())20# init session21session = SESSION(app)22# init class for translation23trans_class = trans(dic_file_path = TRANS_PATH)24# init dbdef object25dbd_objs = {}26for i in DATABASES:27 dbd_objs[i] = lm.dbdef(ENVS[i]['DEF_FILE'])28# results dic template29tpl_results_dic = {30 'links': NAV_LINKS,31 'logged_user': None,32 'result_list': [],33 'len_results': 0,34 'dbs': PUBLIC_DBS,35 'db': '1',36 'mol': '',37 'table_heads': [],38 'max_results_num': 300,39 'pages': 1,40 'page': 1,41 'search_mol': '',42 'trans_class': None,43 'db_selected': is_selected('1'),44 'mode_selected': is_selected('2'),45 'query_id': '',46 'time_consumed': 0,47 'last_query_id': '',48 'last_db': '',49 'adv_search_query': '',50 'results_search_type': '2',51 'query_mols_dic': {},52 'lang': 'zh_CN',53 'html_title': '',54 'info_head': 'info',55 'info': [],56 'min_simi': '0',57 'urls_dic': URLS_DIC,58 'current_page': ''59 }60# make html pages from templates61html_normal_search = lambda results: WEB_RENDER('header', results_dic = results) + \62 WEB_RENDER('normal_editor', results_dic = results) +\63 WEB_RENDER('last_query', results_dic = results) + \64 WEB_RENDER('display_results', results_dic = results) + \65 WEB_RENDER('footer', results_dic = results)66html_advanced_search = lambda results: WEB_RENDER('header', results_dic = results) + \67 WEB_RENDER('advanced_editor', results_dic = results) +\68 WEB_RENDER('last_query', results_dic = results) + \69 WEB_RENDER('display_results', results_dic = results) + \70 WEB_RENDER('footer', results_dic = results)71html_molinfo = lambda results: WEB_RENDER('header', results_dic = results) + \72 WEB_RENDER('molinfo', results_dic = results) +\73 WEB_RENDER('footer', results_dic = results)74html_info = lambda results: WEB_RENDER('header', results_dic = results) + \75 WEB_RENDER('info', results_dic = results) +\76 WEB_RENDER('footer', results_dic = results)77html_index = lambda results: WEB_RENDER('header', results_dic = results) + \78 WEB_RENDER('index', results_dic = results) +\79 WEB_RENDER('footer', results_dic = results)80html_login = lambda results: WEB_RENDER('header', results_dic = results) + \81 WEB_RENDER('login', results_dic = results) +\82 WEB_RENDER('footer', results_dic = results)83html_register = lambda results: WEB_RENDER('header', results_dic = results) + \84 WEB_RENDER('register', results_dic = results) +\85 WEB_RENDER('footer', results_dic = results)86html_edit = lambda results: WEB_RENDER('header', results_dic = results) + \87 WEB_RENDER('edit', results_dic = results) +\88 WEB_RENDER('footer', results_dic = results)89html_ucpanel = lambda results: WEB_RENDER('header', results_dic = results) + \90 WEB_RENDER('ucpanel', results_dic = results) +\91 WEB_RENDER('footer', results_dic = results)92def html_no_permision(results_dic):93 results_dic['html_title'] = 'permission denied'94 results_dic['info'].append('you have no permission to access here')95 return html_info(results_dic)96def html_query_imcomplete(results_dic):97 results_dic['info'].append('query imcomplete')98 results_dic['html_title'] = 'query err'99 return html_info(results_dic)100def html_query_illegal(results_dic):101 results_dic['info'].append('contains illegal words')102 results_dic['html_title'] = 'query err'103 return html_info(results_dic)104def html_wrong_db(results_dic):105 results_dic['info'].append('wrong db name')106 results_dic['html_title'] = 'wrong db name'107 return html_info(results_dic)108#================================================END INITIALIZATION==============================================109#=======================================================MAIN=====================================================110# The main script of the web site mymoldb111# web site classes112class index:113 def __init__(self):114 # set the target language to session.lang115 trans_class.lang = session.lang116 self.results_dic = tpl_results_dic.copy()117 self.results_dic.update( {118 'logged_user': session.nickname,119 'trans_class': trans_class,120 'mode_selected': is_selected(str(session.search_mode)),121 'html_title': 'home',122 'info': [],123 'current_page': 'home',124 'lang': session.lang125 } )126 def GET(self, name = ''):127 results_dic = self.results_dic128 return html_info(results_dic)129class register:130 def __init__(self):131 # set the target language to session.lang132 trans_class.lang = session.lang133 self.results_dic = tpl_results_dic.copy()134 self.results_dic.update( {135 'logged_user': session.nickname,136 'trans_class': trans_class,137 'mode_selected': is_selected(str(session.search_mode)),138 'html_title': 'register',139 'ref': URLS_DIC["home_url"],140 'info': [],141 'lang': session.lang142 } )143 def GET(self, name = ''):144 if not session.authorized:145 return html_register(self.results_dic)146 else:147 web.seeother(self.results_dic['ref'])148 def POST(self, name = ''):149 input = web.input()150 results_dic = self.results_dic151 for i in ('nick', 'passwd', 'cfm_pw', 'username'):152 if not ( input.has_key(i) and input.get(i) ):153 results_dic['info'].append('info not complete')154 return html_register(results_dic)155 nick = query_preprocessing(input.get('nick'))156 if not 2 <= len(nick) <= 20:157 results_dic['info'].append('nick length not fit')158 return html_register(results_dic)159 # username is email160 username = query_preprocessing(input.get('username'))161 if not re.match(r'[._0-9a-zA-Z]+@[._0-9a-zA-Z]+', username):162 results_dic['info'].append('not valid username')163 return html_register(results_dic)164 # passwd is about to hash, so, no need to preprocess165 passwd = input.get('passwd')166 if passwd != input.get('cfm_pw'):167 results_dic['info'].append('pass word not well confirmed')168 return html_register(results_dic)169 # check if the username available170 env_db = ENVS[USERSDB]171 userdb_obj = lm.users_db(env_db)172 if userdb_obj.select(['*'], '%s = "%s"' %(env_db['USER_EMAIL_FIELD'], username)):173 results_dic['info'] += ['user already registered', ': ', username]174 userdb_obj.close()175 return html_register(results_dic)176 userdb_obj.close()177 if APPROVE_ALL_REGISTERS:178 # to add auto register179 userdb_obj = lm.users_db(env_db)180 userdb_obj.insert_into_usersdb('', nick, DEFAULT_USER_GROUP, md5(passwd), username, time.strftime('%Y-%m-%d %H:%M:%S'), 1)181 results_dic['info'] += ['register finished', 'you can login now']182 userdb_obj.close()183 else:184 tmp_info_dic = {'username': username, 'group': DEFAULT_USER_GROUP, 'nick': nick, 'passwd': md5(passwd)}185 id = md5(str(tmp_info_dic))186 info_dic = { id: tmp_info_dic }187 try:188 register_info_file = open(REGISTER_INFO_FILE, 'r')189 info_dic_from_file = pickle.load(register_info_file)190 register_info_file.close()191 if not info_dic_from_file.has_key(id):192 info_dic_from_file.update(info_dic)193 else:194 results_dic['info'] += ['you hava already submited your info', ',', 'please waiting for been approved']195 return html_register(results_dic)196 register_info_file = open(REGISTER_INFO_FILE, 'w')197 pickle.dump(info_dic_from_file, register_info_file)198 register_info_file.close()199 except:200 register_info_file = open(REGISTER_INFO_FILE, 'w')201 pickle.dump(info_dic, register_info_file)202 register_info_file.close()203 results_dic['info'] += ['register finished', ',', 'please waiting for been approved']204 return html_register(results_dic)205class login:206 '''class for user login/logout'''207 def __init__(self):208 # set the target language to session.lang209 trans_class.lang = session.lang210 self.last_login_time = session.last_login_time211 self.login_try_times = session.login_try_times212 self.results_dic = tpl_results_dic.copy()213 self.results_dic.update( {214 'logged_user': session.nickname,215 'trans_class': trans_class,216 'mode_selected': is_selected(str(session.search_mode)),217 'html_title': 'login',218 'ref': URLS_DIC["home_url"],219 'info': [],220 'lang': session.lang221 } )222 def GET(self, name = ''):223 results_dic = self.results_dic224 ref = URLS_DIC['home_url']225 input = web.input()226 if input.has_key('ref') and input.get('ref'):227 if input.get('ref') != web.ctx.path:228 ref = input.get('ref')229 # logout230 if input.has_key('action') and input.get('action'):231 if input.get('action') == 'logout':232 session.kill()233 return 'logout OK'234 if session.authorized:235 return web.seeother(ref)236 results_dic['ref'] = ref237 if self.login_try_times >= MAX_LOGIN_TRY_TIMES:238 if time.time() - self.last_login_time < LOGIN_INTERVAL:239 results_dic["info"].append('try again after a while')240 return html_info(self.results_dic)241 else:242 session.login_try_times = 0243 else:244 session.login_try_times += 1245 return html_login(results_dic)246 def POST(self, name = ''):247 input = web.input()248 env_db = ENVS[USERSDB]249 userdb_obj = lm.users_db(env_db)250 results_dic = self.results_dic251 ref = URLS_DIC['home_url']252 if input.has_key('ref') and input.get('ref'):253 ref = base64.b64decode(input.get('ref'))254 if ref != web.ctx.path:255 results_dic['ref'] = ref256 if session.authorized:257 return web.seeother(ref)258 if ( input.has_key("username") and input.get("username") ) \259 and ( input.has_key("password") and input.get("password") ):260 # user password261 pw = ''262 # default user group is 3 (viewer)263 ug = session.usergroup264 # user name265 nm = query_preprocessing(input.get("username"))266 # username should be email267 if not re.match(r'[0-9_.a-zA-Z]+@[0-9_.a-zA-Z]+', nm):268 results_dic['info'].append("not valid user name")269 return html_login(results_dic)270 # try to connect to the user accounts db to verify the login info of the user271 user_info = userdb_obj.select(['*'], '%s = "%s" LIMIT 1' %(env_db['USER_EMAIL_FIELD'], nm))272 if user_info:273 user_info_dic = user_info[0]274 status = user_info_dic.get(env_db['STATUS_FIELD'])275 if status != 1:276 results_dic['info'].append("user had been deactived")277 return html_login(results_dic)278 else:279 pw = user_info_dic.get(env_db['PASSWORD_FIELD'])280 ug = user_info_dic.get(env_db['USER_GROUP_FIELD'])281 ui = user_info_dic.get(env_db['USER_ID_FIELD'])282 nk = user_info_dic.get(env_db['NICK_FIELD'])283 if pw == md5(input.get("password")):284 session.authorized = True285 # if no nick name, then use user name (the email) as nick name286 if not nk:287 session.nickname = nm288 else:289 session.nickname = nk290 session.usergroup = ug291 session.userid = ui292 session.username = nm293 else:294 results_dic['info'].append("pass word incorrect")295 return html_login(results_dic)296 else:297 results_dic['info'].append("user not valid")298 return html_login(results_dic)299 return web.seeother(ref)300class man_users:301 def __init__(self):302 trans_class.lang = session.lang303 self.results_dic = tpl_results_dic.copy()304 self.results_dic.update( {305 'logged_user': session.nickname,306 'trans_class': trans_class,307 'lang': session.lang,308 'groups': ', '.join([ str(i) for i in USER_GROUPS ]),309 'info': [],310 'current_page': ''311 } )312 def GET(self, name = ''):313 input = web.input()314 results_dic = self.results_dic315 env_db = ENVS[USERSDB]316 if ( not session.authorized ) or session.usergroup != 1:317 results_dic['info'].append('you have no permission to access here')318 return WEB_RENDER('info', results = results_dic)319 userdb_obj = lm.users_db(env_db)320 # select out the users in group 1 ( administrators ), in case no administrator left321 # after removment or deactivation322 admins = []323 for a in userdb_obj.select([env_db['USER_ID_FIELD']], '%s = "%s"' %(env_db['USER_GROUP_FIELD'], '1')):324 if a.has_key(env_db['USER_ID_FIELD']):325 admins.append(a.get(env_db['USER_ID_FIELD']))326 if input.has_key('approve_id') and input.get('approve_id'):327 try:328 register_info_file = open(REGISTER_INFO_FILE, 'r')329 info_dic_from_file = pickle.load(register_info_file)330 register_info_file.close()331 info = info_dic_from_file.get(input.get('approve_id'))332 # send email to notify user333 sendmail(SENDMAIL_SETTINGS_DICT,334 info['username'],335 'MyMolDB register successed notify',336 '''337Dear %s,338Your application on MyMolDB as a contributor and user has been approved, you can login the site with the user name %s.339The URL of our site is http://xxxxxxxxxx.xxx/db/.340Welcome!341Sincerely yours''' %(info['nick'], info['username']) )342 userdb_obj.insert_into_usersdb('', info['nick'], DEFAULT_USER_GROUP, info['passwd'], info['username'], time.strftime('%Y-%m-%d %H:%M:%S'), 1)343 # delete the info in REGISTER_INFO_FILE after the register procedure finished344 info_dic_from_file.pop(input.get('approve_id'))345 register_info_file = open(REGISTER_INFO_FILE, 'w')346 pickle.dump(info_dic_from_file, register_info_file)347 register_info_file.close()348 results_dic['info'] += ['approve user', info['username'], 'successed']349 except:350 results_dic['info'] += ['approve user', info['username'], 'failed']351 if input.has_key('active_id') and input.get('active_id'):352 active_id = input.get('active_id')353 userdb_obj.update_usersdb(354 {env_db['STATUS_FIELD']: '1'},355 '%s = "%s"' %(env_db['USER_ID_FIELD'], active_id) )356 results_dic['info'] += ['active', 'user', active_id, '(id)', 'successed']357 if input.has_key('deactive_id') and input.get('deactive_id'):358 deactive_id = input.get('deactive_id')359 if len(admins) <= 1 and int(deactive_id) in admins:360 results_dic['info'] += ['at least one admin is needed', ', ', 'you can not', 'deactive', 'it']361 else:362 userdb_obj.update_usersdb(363 {env_db['STATUS_FIELD']: '0'},364 '%s = "%s"' %(env_db['USER_ID_FIELD'], input.get('deactive_id')) )365 results_dic['info'] += ['deactive', 'user', deactive_id, '(id)', 'successed']366 if input.has_key('remove_id') and input.get('remove_id'):367 remove_id = input.get('remove_id')368 if len(admins) <= 1 and int(remove_id) in admins:369 results_dic['info'] += ['at least one admin is needed', ', ', 'you can not', ' ', 'remove', 'it']370 else:371 userdb_obj.delete('%s = "%s"' %(env_db['USER_ID_FIELD'], input.get('remove_id')))372 results_dic['info'] += ['remove', 'user', remove_id, '(id)', 'successed']373 if ( input.has_key('chgroup_id') and input.get('chgroup_id') ) and \374 ( input.has_key('group') and int(input.get('group')) in USER_GROUPS):375 id = input.get('chgroup_id')376 group = input.get('group')377 userdb_obj.update_usersdb(378 {env_db['USER_GROUP_FIELD']: group},379 "%s = '%s'" %(env_db['USER_ID_FIELD'], id) )380 results_dic['info'] += [chgroup_id, '(id)', 'change group', 'to', group, 'successed']381 approved_users = []382 unapproved_users = []383 user_info = userdb_obj.select(['*'], '1')384 userdb_obj.close()385 if user_info:386 for info in user_info:387 approved_users.append(388 (str(info[env_db['USER_ID_FIELD']]),389 info[env_db['USER_GROUP_FIELD']],390 info[env_db['USER_EMAIL_FIELD']],391 info[env_db['NICK_FIELD']],392 str(info[env_db['STATUS_FIELD']])) )393 try:394 register_info_file = open(REGISTER_INFO_FILE, 'r')395 info_dic_from_file = pickle.load(register_info_file)396 register_info_file.close()397 for id, info in info_dic_from_file.items():398 # '-1' means still not approved, '0' means inactive while '1' is active399 unapproved_users.append( (id, info['group'], info['username'], info['nick'], '-1') )400 except:401 pass402 results_dic['approved_users'] = approved_users403 results_dic['unapproved_users'] = unapproved_users404 #return approved_users, unapproved_users405 return WEB_RENDER('man_users', results_dic = results_dic)406class ucpanel:407 def __init__(self):408 trans_class.lang = session.lang409 self.results_dic = tpl_results_dic.copy()410 self.userid = session.userid411 self.nickname = session.nickname412 self.usergroup = session.usergroup413 self.results_dic.update( {414 'logged_user': self.nickname,415 'trans_class': trans_class,416 'lang': session.lang,417 'html_title': 'user control panel',418 'ucp_urls': [],419 'info': [],420 'current_page': ''421 } )422 def GET(self, name = ''):423 results_dic = self.results_dic424 if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s')) ):425 return html_no_permision(results_dic)426 results_dic['ucp_urls'] += UCP_URLS427 # users in group 1 are administrators, so, show them the user management option428 if session.usergroup == 1:429 user_man_url = ('user management', URLS_DIC['manusers_url'], '')430 if not user_man_url in results_dic['ucp_urls']:431 results_dic['ucp_urls'].append(user_man_url)432 return html_ucpanel(results_dic)433class chusersettings:434 def __init__(self):435 trans_class.lang = session.lang436 self.results_dic = tpl_results_dic.copy()437 self.results_dic.update( {438 'logged_user': session.nickname,439 'trans_class': trans_class,440 'lang': session.lang,441 'reload_parent': False,442 'info': [],443 'nick_name': session.nickname,444 'email': session.username445 } )446 def GET(self, name = ''):447 # check basic permission448 if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s')) ):449 return html_no_permision(self.results_dic)450 return WEB_RENDER('chusersettings', results_dic = self.results_dic)451 def POST(self, name = ''):452 # check basic permission453 if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s')) ):454 return html_no_permision(self.results_dic)455 input = web.input()456 results_dic = self.results_dic457 partial_sql = ''458 sql_string = ''459 new_nick = ''460 new_email = ''461 env_db = ENVS[USERSDB]462 old_pw = ''463 quit = False464 if input.has_key('nick') and input.get('nick'):465 new_nick = input.get('nick')466 if new_nick != session.nickname:467 if 2 <= len(new_nick) <= 20:468 partial_sql += ' %s = "%s", ' %(env_db['NICK_FIELD'], new_nick)469 if ( input.has_key('new_pw') and input.get('new_pw') ) and ( input.has_key('cfm_pw') and input.get('cfm_pw') ):470 new_pw = input.get('new_pw')471 if new_pw == input.get('cfm_pw'):472 if input.has_key('old_pw') and input.get('old_pw'):473 old_pw = input.get('old_pw')474 userdb_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])475 user_info = userdb_obj.execute('SELECT %s FROM %s WHERE %s = "%s" LIMIT 1;' %(476 env_db['PASSWORD_FIELD'], env_db['USERS_TABLE'], env_db['NICK_FIELD'], session.nickname))477 userdb_obj.close()478 if user_info:479 user_info = user_info[0]480 if user_info.has_key(env_db['PASSWORD_FIELD']):481 if user_info.get(env_db['PASSWORD_FIELD']) == md5(old_pw):482 partial_sql += ' %s = "%s", ' %(env_db['PASSWORD_FIELD'], md5(new_pw))483 else:484 results_dic['info'].append('wrong old pass word')485 quit = True486 else:487 results_dic['info'].append('failed to get user info')488 quit = True489 else:490 results_dic['info'] += ['no such user', ': ', session.nickname]491 quit = True492 else:493 results_dic['info'].append('old pass word is needed')494 quit = True495 else:496 results_dic['info'].append('pass word not well confirmed')497 quit = True498 if quit:499 return WEB_RENDER('chusersettings', results = results_dic)500 elif partial_sql:501 userdb_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])502 sql_string = 'UPDATE %s SET %s WHERE %s = "%s";' %(503 env_db['USERS_TABLE'], partial_sql.rstrip(', '), env_db['NICK_FIELD'], session.nickname)504 userdb_obj.execute(sql_string)505 userdb_obj.close()506 if new_nick:507 session.nickname = new_nick508 if new_email:509 session.username = new_email510 results_dic['reload_parent'] = True511 results_dic['info'].append('settings updated successfully')512 return WEB_RENDER('chusersettings', results_dic = results_dic)513 else:514 results_dic['info'].append('nothing to change')515 return WEB_RENDER('chusersettings', results_dic = results_dic)516class search:517 '''for structure search'''518 def __init__(self):519 # set default results to show per page520 self.results_per_page = int(session.results_per_page)521 # set max results number if it's not yet set522 self.max_results_num = int(session.max_results_num)523 # set the target language to session.lang524 trans_class.lang = session.lang525 self.results_dic = tpl_results_dic.copy()526 self.results_dic.update( {527 'logged_user': session.nickname,528 'max_results_num': self.max_results_num,529 'trans_class': trans_class,530 'db_selected': is_selected('1'),531 'mode_selected': is_selected('2'),532 'query_id': '',533 'part_fields': [],534 'pri_and_struc_fields': {'pri_field': '', 'struc_field': ''},535 'results_search_type': '2',536 'lang': session.lang,537 'prequery': '',538 'html_title': 'normal search',539 'info': [],540 'current_page': 'search'541 } )542 def GET(self, name = ''):543 input = web.input()544 # results_mol_ids stores the mol ids found by the previous query545 results_of_get = []546 simi_values = []547 results_mol_ids = []548 results = []549 results_dic = self.results_dic550 search_mol = session.search_mol551 stored_results_of_post = ''552 stored_results_of_get = {}553 results_mol_ids = []554 query_info = {}555 query_mols_dic = {}556 adv_search_query = ''557 search_mol = ''558 md5_query = ''559 available_dbs = PUBLIC_DBS560 if session.authorized:561 available_dbs = DATABASES562 self.results_dic.update({'dbs': available_dbs})563 # check basic permission564 if not is_permited(session.userid, session.usergroup, ('s')):565 return html_no_permision(results_dic)566 try:567 if input.has_key('db') and input.get('db'):568 db_name = input.get('db')569 if db_name not in available_dbs:570 return html_wrong_db(results_dic)571 session.db = db_name572 else:573 db_name = session.db574 if input.has_key('prequery') and input.get('prequery'):575 results_dic['prequery'] = input.get('prequery')576 env_db = ENVS[db_name]577 dbd_obj = dbd_objs[db_name]578 tables = dbd_obj.tables579 pri_field = env_db['PRI_FIELD']580 struc_field = env_db['2D_STRUC_FIELD']581 sql_fields_dic = env_db['FIELDS_TO_SHOW_DIC']582 fields_to_show_list_all = sql_fields_dic['all']583 fields_to_show_list_part = sql_fields_dic['part']584 part_fields = fields_to_show_list_part['fields']585 part_fields = [ re.sub(r'^[^\.]+\.', '', i) for i in part_fields ]586 table_heads = fields_to_show_list_part['comments']587 if input.has_key('query_id') and input.get('query_id'):588 md5_query = str(input.get('query_id'))589 if input.has_key('results_per_page'):590 self.results_per_page = int(input.get('results_per_page'))591 session.results_per_page = int(input.get('results_per_page'))592 if input.has_key('max_results_num'):593 self.max_results_num = int(input.get('max_results_num'))594 session.max_results_num = int(input.get('max_results_num'))595 search_type = session.search_mode596 if input.has_key('mode') and input.get('mode'):597 search_type = input.get('mode')598 else:599 search_type = session.search_mode600 results_dic['mode_selected'] = is_selected(search_type)601 if input.has_key('page'):602 # check if the results_of_get_file already exists603 # results_of_get_file is the results from the GET method of search class604 results_of_get_file = CACHE_PATH + '/' + md5_query + '.get'605 if os.path.exists(results_of_get_file):606 if time.time() - os.path.getmtime(results_of_get_file) <= RESULTS_FILE_TIME_OUT:607 f = open(results_of_get_file)608 stored_results_of_get = pickle.load(f)609 query_db = stored_results_of_get['query_info']['query_db']610 if( not query_db in available_dbs ) or query_db != db_name:611 return html_wrong_db(results_dic)612 f.close()613 else:614 os.remove(results_of_get_file)615 else:616 # post_results_file is the results from the POST method of search class617 post_results_file = CACHE_PATH + '/' + md5_query + '.post'618 if os.path.exists(post_results_file):619 f = open(post_results_file)620 stored_results_of_post = pickle.load(f)621 f.close()622 results_mol_ids = stored_results_of_post['query_results_ids']623 simi_values = stored_results_of_post['simi_values']624 query_info = stored_results_of_post['query_info']625 else:626 return web.seeother(URLS_DIC['search_url'])627 if results_mol_ids:628 db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])629 sql_obj = lm.sql(env_db)630 if not pri_field in part_fields:631 select_cols = tables[0] + '.' + pri_field + ', ' + ', '.join(fields_to_show_list_part['fields'])632 else:633 select_cols = ', '.join(fields_to_show_list_part['fields'])634 query_string = 'SELECT %s FROM %s WHERE ( %s IN ( %s ) );' % (635 select_cols,636 sql_obj.gen_join_part_sql(tables, pri_field),637 tables[0] + '.' + pri_field, ', '.join(results_mol_ids) )638 time_before_search = time.time()639 # this step is always fast, so no lock is set640 results = db_obj.execute(query_string)641 time_consumed = time.time() - time_before_search642 query_info['time_consumed'] += time_consumed643 db_obj.close()644 simi_field = env_db['SIMI_FIELD']645 for r in results:646 tmpd = {}647 mol_id = ''648 if not pri_field in part_fields:649 tmpd[pri_field] = r[pri_field]650 mol_id = r[pri_field]651 for j in fields_to_show_list_part['fields']:652 j = re.sub(r'^[^\.]+\.', '', j)653 if j in [ dbd_obj.get_field(k) for k in env_db['COMPRESSED_KEYS'] ]:654 # uncompress compressed entries655 if j == dbd_obj.get_field(env_db['2D_STRUC_KEY']):656 if session.removeh:657 mol = lm.mymol('mol', zlib.decompress(r[j])).removeh()658 else:659 mol = zlib.decompress(r[j])660 tmpd[j] = mol661 else:662 tmpd[j] = zlib.decompress(r[j])663 elif j == pri_field:664 mol_id = r[j]665 tmpd[j] = mol_id666 else:667 tmpd[j] = r[j]668 if mol_id and simi_values and search_type == "3":669 tmpd[simi_field] = simi_values[mol_id]670 # l contains the mol info, each mol in a sublist: [ [...], [...] ]671 results_of_get.append(tmpd)672 if search_type == '3':673 table_heads = fields_to_show_list_part['comments'] + ['simi value']674 part_fields = fields_to_show_list_part['fields'] + [simi_field]675 part_fields = [ re.sub(r'^[^\.]+\.', '', i) for i in part_fields ]676 # sort the results on similarity677 if simi_values:678 results_of_get.sort( lambda e1, e2: - cmp(e1[simi_field], e2[simi_field]) )679 for i in results_of_get:680 i.update({simi_field: str(round(i[simi_field]*100, 2)) + '%'})681 stored_results_of_get = {682 'results_of_get': results_of_get,683 'part_fields': part_fields,684 'pri_and_struc_fields': {'pri_field': pri_field, 'struc_field': struc_field},685 'table_heads': table_heads,686 'query_info': query_info687 }688 # store the results689 f = open(results_of_get_file, 'w')690 pickle.dump(stored_results_of_get, f)691 f.close()692 # results about to display693 query_info = stored_results_of_get['query_info']694 db_name = query_info['query_db']695 query_mols_dic = query_info['query_mols_dic']696 page = int(input.get('page'))697 results_of_get = stored_results_of_get['results_of_get']698 len_results = len(results_of_get)699 # calculate the page thing700 pages = ( lambda x, y: x % y and x / y + 1 or x / y ) (len_results, self.results_per_page)701 show_range_left = self.results_per_page * (page - 1)702 if show_range_left > len_results:703 show_range_left = len_results - ( len_results % self.results_per_page )704 show_range_right = self.results_per_page * page705 # store the results in a dict706 results_dic.update({707 'result_list': results_of_get[show_range_left:show_range_right],708 'len_results': len_results,709 'db': db_name,710 'table_heads': stored_results_of_get['table_heads'],711 'part_fields': stored_results_of_get['part_fields'],712 'pri_and_struc_fields': stored_results_of_get['pri_and_struc_fields'],713 'max_results_num': query_info['max_results_num'],714 'pages': pages,715 'page': page,716 'min_simi': str(round(query_info['min_simi'], 2)),717 'search_mol': query_info['query_mol'],718 'query_mols_dic': query_info['query_mols_dic'],719 'query_id': md5_query,720 'time_consumed': round(query_info['time_consumed'], 2),721 'adv_search_query': query_info['adv_search_query'],722 'db_selected': is_selected(db_name),723 'results_search_type': query_info['query_mode'],724 'last_query_id': query_info['last_query_id'],725 'last_db': query_info['last_db']726 })727 except Exception, e:728 results_dic['info'].append('check your query')729 results_dic['info'].append(e)730 results_dic['html_title'] = 'query err'731 return html_info(results_dic)732 # set the title of the html page and render the results733 if search_type == "4":734 results_dic['html_title'] = 'advanced search'735 return html_advanced_search(results_dic)736 else:737 results_dic['html_title'] = 'normal search'738 return html_normal_search(results_dic)739 def POST(self, name = ''):740 # search types: 1: exact search, 2: substructure search,741 # 3: similarity search, 4: advanced search, 5: superstructure search742 input = web.input()743 results_dic = tpl_results_dic.copy()744 # check basic permission745 if not is_permited(session.userid, session.usergroup, ('s')):746 return html_no_permision(results_dic)747 query_smiles = ''748 adv_search_query = ''749 query_smiles_dic = {}750 query_mols_dic = {}751 max_results_num_from_query = 0752 # for similarity search753 min_simi = 0754 simi_values_dic = {}755 regex0 = ''756 regex1 = ''757 regex2 = ''758 last_mol_ids = []759 last_query_id = ''760 last_db = ''761 available_dbs = PUBLIC_DBS762 if session.authorized:763 available_dbs = DATABASES764 self.results_dic.update({'dbs': available_dbs})765 try:766 if input.has_key('results_per_page'):767 self.results_per_page = int(input.get('results_per_page'))768 session.results_per_page = int(input.get('results_per_page'))769 if input.has_key('max_results_num'):770 self.max_results_num = int(input.get('max_results_num'))771 session.max_results_num = int(input.get('max_results_num'))772 if input.has_key('mol'):773 search_mol = str(input.get('mol'))774 session.search_mol = str(input.get('mol'))775 else:776 search_mol = ''777 search_type = session.search_mode778 if input.has_key('mode') and input.get('mode'):779 if input.get('mode') in SEARCH_MODES:780 search_type = input.get('mode')781 session.search_mode = input.get('mode')782 else:783 results_dic['info'] += ['invalid mode', input.get('mode')]784 return html_info(results_dic)785 # chose which database to use786 if input.has_key('db') and input.get('db'):787 db_name = input.get('db')788 if db_name not in available_dbs:789 return html_wrong_db(results_dic)790 session.db = input.get('db')791 else:792 db_name = session.db793 env_db = ENVS[db_name]794 sql_obj = lm.sql(env_db)795 dbd_obj = dbd_objs[db_name]796 tables = dbd_obj.tables797 pri_field = env_db['PRI_FIELD']798 smi_field = dbd_obj.get_field(env_db['SMILES_KEY'])799 simi_field = env_db['SIMI_FIELD']800 # in advanced mode, there could be more than one smiles and mols separated with "|".801 if search_type == "4":802 if input.has_key('query') and input.get('query'):803 adv_search_query = query_preprocessing(str(input.get('query'))) + ' ' # add a space at the end for regex match804 # recover escaped ' and ", for ' and " are legal in mode "4"805 adv_search_query = re.sub(r'\\[\'\"]', '"', adv_search_query)806 else:807 return html_query_imcomplete(results_dic)808 # get the smiless and mols from the input of advanced mode809 query_smiles_dic = {}810 query_mols_dic = {}811 if input.has_key('smiless') and input.get('smiless'):812 for j in [ i for i in query_preprocessing(str(input.get('smiless'))).split('|') if i and i != '|' ]:813 tmpl = j.split(':')814 if len(tmpl) == 2:815 query_smiles_dic[tmpl[0]] = tmpl[1]816 if input.has_key('mols') and input.get('mols'):817 for j in [ i for i in query_preprocessing(str(input.get('mols'))).split('|') if i and i != '|' ]:818 tmpl = j.split(':')819 if len(tmpl) == 2:820 query_mols_dic[tmpl[0]] = tmpl[1]821 # store in session822 if query_smiles_dic:823 session.query_smiles_dic = query_smiles_dic824 if query_mols_dic:825 session.query_mols_dic = query_mols_dic826 # check if the query legal827 # first check if there are key words what are not in the abbr_dic828 regex0 = re.compile(r'([><=!~]+ *[^ )(]+[ )(]*)|([)(])|([sS][uU][bBpP])|([mM][aA][xX])|([aA][nN][dD])|([oO][rR])|([nN][oR][tT])')829 key_words = []830 key_words = list(set(regex0.sub(' ', adv_search_query).split()))831 for k in key_words:832 if not k in env_db['ABBR_DIC'].keys():833 results_dic = self.results_dic834 results_dic['info'].append('contains illegal words')835 results_dic['info'].append(': ' + k)836 results_dic['html_title'] = 'query err'837 return html_info(results_dic)838 # second check if the mol buffer contains all needed molecules839 regex1 = re.compile(r'[sS][uU][bBpP] *[!=]+ *[^ )(]*(?=[ )(]+)')840 mol_key = ''841 for i in regex1.findall(adv_search_query):842 mol_key = i.split('=')[-1].strip(' ')843 if not ( query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key] ):844 results_dic = self.results_dic845 results_dic['info'].append('mol buffer imcomplete')846 results_dic['html_title'] = 'query err'847 return html_info(results_dic)848 # replace some words (~ to like) and abstract the max (limit) value if it has849 new_adv_search_query = adv_search_query.replace('~', ' LIKE ')850 regex2 = re.compile(r'[mM][aA][xX] *=+ *[^ )(]*(?=[ )(]+)')851 tmpl = regex2.findall(new_adv_search_query)852 if len(tmpl) == 1:853 try:854 max_results_num_from_query = int(tmpl[0].split('=')[-1].strip(' '))855 except:856 pass857 new_adv_search_query = regex2.sub('', new_adv_search_query)858 try:859 query_sql = sql_obj.gen_adv_search_sql(new_adv_search_query, query_smiles_dic, env_db['ABBR_DIC'])860 except Exception, e:861 results_dic['info'] += ['query err', e]862 return html_info(results_dic)863 elif input.has_key('smiles') and input.get('smiles'):864 query_smiles = query_preprocessing(str(input.get('smiles')))865 if search_type == "3":866 if input.has_key('min_simi') and input.get("min_simi"):867 try:868 min_simi = float(input.get("min_simi"))869 except:870 results_dic = self.results_dic871 results_dic['info'].append('min simi contains illegal char')872 results_dic['html_title'] = 'query err'873 return html_info(results_dic)874 else:875 return html_query_imcomplete(results_dic)876 query_sql = sql_obj.gen_simi_search_sql(query_smiles, min_simi)877 else:878 if search_type == '1':879 type = '1'880 elif search_type == '2':881 type = '2'882 elif search_type == '5':883 type = '4'884 query_sql = sql_obj.gen_search_sql(query_smiles, type)885 else:886 return html_query_imcomplete(results_dic)887 db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])888 results = []889 results_tmp = True890 # search in results891 if input.has_key('search_in_results') and input.get('search_in_results'):892 last_results_of_post_file = CACHE_PATH + '/' + input.get('search_in_results') + '.post'893 if os.path.exists(last_results_of_post_file):894 f = open(last_results_of_post_file)895 last_results_of_post = pickle.load(f)896 f.close()897 last_mol_ids = last_results_of_post['query_results_ids']898 last_query_info = last_results_of_post['query_info']899 last_query_id = last_query_info['query_id']900 last_db = last_query_info['query_db']901 # generates the sql string for query902 if last_mol_ids:903 query_string = 'SELECT %s, %s %s AND %s IN (%s)' % (904 tables[0] + '.' + pri_field,905 smi_field,906 query_sql,907 tables[0] + '.' + pri_field,908 ', '.join(last_mol_ids) )909 else:910 query_string = 'SELECT %s, %s %s' % (911 tables[0] + '.' + pri_field,912 smi_field,913 query_sql)914 # check if there's already a results file915 md5_query = md5(query_string + db_name + search_type)916 session.md5_query = md5_query917 results_of_post_file = CACHE_PATH + '/' + md5_query + '.post'918 lock_file = results_of_post_file + '.lock'919 if os.path.exists(results_of_post_file):920 if time.time() - os.path.getmtime(results_of_post_file) >= RESULTS_FILE_TIME_OUT:921 os.remove(results_of_post_file)922 else:923 return web.seeother(URLS_DIC['search_url'] + '?page=1&db=%s&query_id=%s' %(db_name, md5_query))924 # check if the lock file exists, if exists then wait, else continue.925 while os.path.exists(lock_file):926 # check if the life span of lock_file reached927 if time.time() - os.path.getmtime(lock_file) >= LOCK_FILE_LIFE_SPAN:928 os.remove(lock_file)929 else:930 time.sleep(5)931 # define filters932 filter = None933 if search_type == "1":934 def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):935 if results_dict.has_key(smiles_field):936 return mol_obj.gen_openbabel_can_smiles() == results_dict[smiles_field]937 return False938 elif search_type == "2":939 def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):940 if results_dict.has_key(smiles_field):941 return mol_obj.sub_match('smi', results_dict[smiles_field])942 return False943 elif search_type == "3":944 # similarity search actually needs no filter.945 pass946 elif search_type == '4':947 sub_smiles = []948 sup_smiles = []949 sub_m_objs = []950 sup_m_objs = []951 re_sub = re.compile(r'[sS][uU][bB] *[!=]+ *[^ )(]*(?=[ )(]+)')952 re_sup = re.compile(r'[sS][uU][pP] *[!=]+ *[^ )(]*(?=[ )(]+)')953 for i in re_sub.findall(adv_search_query):954 # for querying for a molecule contains no a paticular substructure always955 # filters out some positive ones, so it's no need to filter here any more,956 # hence we exclude those '!=' ones here.957 if re.findall(r'!=', i):958 continue959 elif re.findall(r'[^!]=', i):960 mol_key = i.split('=')[-1].strip(' ')961 if query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key]:962 sub_smiles.append(query_smiles_dic[mol_key])963 for i in re_sup.findall(adv_search_query):964 # for querying for the superstructure of a paticular molecule always965 # filters out some positive ones, so it's no need to filter here any more,966 # hence we exclude those '!=' ones here.967 if re.findall(r'!=', i):968 continue969 elif re.findall(r'[^!]=', i):970 mol_key = i.split('=')[-1].strip(' ')971 if query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key]:972 sup_smiles.append(query_smiles_dic[mol_key])973 sub_m_objs = [ lm.mymol('smi', m) for m in sub_smiles ]974 sup_m_objs = [ lm.mymol('smi', m) for m in sup_smiles ]975 # filter is only needed to define when the m_objs list is not empty.976 if sub_m_objs or sup_m_objs:977 def filter(results_dict,978 sub_mol_objs = sub_m_objs,979 sup_mol_objs = sup_m_objs,980 smiles_field = smi_field):981 if results_dict.has_key(smiles_field):982 for i in sub_mol_objs:983 if not i.sub_match('smi', results_dict[smiles_field]):984 return False985 for i in sup_mol_objs:986 if not i.sup_match('smi', results_dict[smiles_field]):987 return False988 return True989 return False990 elif search_type == '5':991 def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):992 if results_dict.has_key(smiles_field):993 return mol_obj.sup_match('smi', results_dict[smiles_field])994 return False995 # limit the results996 if max_results_num_from_query:997 num_per_select = 150998 max_results_num = max_results_num_from_query999 elif search_type == '3':1000 max_results_num = 101001 else:1002 num_per_select = 1501003 max_results_num = self.max_results_num1004 # search in database and filter the reuslts1005 # record time consuming1006 time_before_search = time.time()1007 if search_type in ('1', '3'):1008 if search_type == '1':1009 limit = '' #' LIMIT 1'1010 elif search_type == '3':1011 limit = ' LIMIT %s ' %(max_results_num,)1012 # set lock to avoid duplocated search1013 open(lock_file, 'w')1014 results = db_obj.execute(query_string + limit + ';')1015 if os.path.exists(lock_file):1016 os.remove(lock_file)1017 db_obj.close()1018 elif search_type in ('2', '4', '5'):1019 # set lock to avoid duplocated search1020 open(lock_file, 'w')1021 results = db_obj.query(1022 query_string,1023 filter,1024 tables[0] + '.' + pri_field,1025 max_results_num,1026 num_per_select)1027 if os.path.exists(lock_file):1028 os.remove(lock_file)1029 db_obj.close()1030 time_consumed = time.time() - time_before_search1031 # preprocessing the results to store1032 # cut of the extra results1033 results = results[:max_results_num]1034 results_dic_to_store = {}1035 mol_id_to_store = []1036 query_info = {}1037 for r in results:1038 if r.has_key(pri_field):1039 id = r[pri_field]1040 mol_id_to_store.append(str(id))1041 if r.has_key(simi_field):1042 simi_values_dic[id] = r[simi_field]1043 query_info = {1044 'query_mols_dic': query_mols_dic,1045 'query_smiles_dic': query_smiles_dic,1046 'query_smiles': query_smiles,1047 'query_mol': search_mol,1048 'query_string': query_string,1049 'query_db': db_name,1050 'max_results_num': max_results_num,1051 'adv_search_query': adv_search_query,1052 'query_mode': search_type,1053 'min_simi': min_simi,1054 'time_consumed': time_consumed,1055 'last_query_id': last_query_id,1056 'last_db': last_db,1057 'query_id': md5_query1058 }1059 results_dic_to_store = {1060 'query_results_ids': mol_id_to_store,1061 'simi_values': simi_values_dic,1062 'query_info': query_info1063 }1064 # store search results1065 f = open(results_of_post_file, 'w')1066 pickle.dump(results_dic_to_store, f)1067 f.close()1068 return web.seeother(URLS_DIC['search_url'] + '?page=1&db=%s&query_id=%s' %(db_name, md5_query))1069 except Exception, e:1070 results_dic = self.results_dic1071 results_dic['info'].append('check your query')1072 results_dic['info'].append(e)1073 results_dic['html_title'] = 'query err'1074 return html_info(results_dic)1075class webapi:1076 '''web api for structure search'''1077 def __init__(self):1078 # set default results to show per page1079 self.results_per_page = int(session.results_per_page)1080 # set max results number if it's not yet set1081 self.max_results_num = int(session.max_results_num)1082 # set the target language to session.lang1083 self.results_dic = tpl_results_dic.copy()1084 self.results_dic.update( {1085 'logged_user': session.nickname,1086 'max_results_num': self.max_results_num,1087 'query_id': '',1088 'part_fields': [],1089 'pri_and_struc_fields': {'pri_field': '', 'struc_field': ''},1090 'results_search_type': '2',1091 'prequery': '',1092 'info': [],1093 } )1094 def GET(self, name = ''):1095 input = web.input()1096 # results_mol_ids stores the mol ids found by the previous query1097 results_of_get = []1098 simi_values = []1099 results_mol_ids = []1100 results = []1101 results_dic = self.results_dic1102 search_mol = session.search_mol1103 stored_results_of_post = ''1104 stored_results_of_get = {}1105 results_mol_ids = []1106 query_info = {}1107 query_mols_dic = {}1108 adv_search_query = ''1109 search_mol = ''1110 md5_query = ''1111 available_dbs = PUBLIC_DBS1112 if session.authorized:1113 available_dbs = DATABASES1114 self.results_dic.update({'dbs': available_dbs})1115 # check basic permission1116 if not is_permited(session.userid, session.usergroup, ('s')):1117 return json.dumps({'status': 'Permition denied'})1118 try:1119 if input.has_key('db') and input.get('db'):1120 db_name = input.get('db')1121 if db_name not in available_dbs:1122 return json.dumps({'status': 'DB not accessible!'})1123 session.db = db_name1124 else:1125 db_name = session.db1126 if input.has_key('prequery') and input.get('prequery'):1127 results_dic['prequery'] = input.get('prequery')1128 env_db = ENVS[db_name]1129 dbd_obj = dbd_objs[db_name]1130 tables = dbd_obj.tables1131 pri_field = env_db['PRI_FIELD']1132 struc_field = env_db['2D_STRUC_FIELD']1133 sql_fields_dic = env_db['FIELDS_TO_SHOW_DIC']1134 fields_to_show_list_all = sql_fields_dic['all']1135 fields_to_show_list_part = sql_fields_dic['part']1136 part_fields = fields_to_show_list_part['fields']1137 part_fields = [ re.sub(r'^[^\.]+\.', '', i) for i in part_fields ]1138 table_heads = fields_to_show_list_part['comments']1139 if input.has_key('query_id') and input.get('query_id'):1140 md5_query = str(input.get('query_id'))1141 if input.has_key('results_per_page'):1142 self.results_per_page = int(input.get('results_per_page'))1143 session.results_per_page = int(input.get('results_per_page'))1144 if input.has_key('max_results_num'):1145 self.max_results_num = int(input.get('max_results_num'))1146 session.max_results_num = int(input.get('max_results_num'))1147 search_type = session.search_mode1148 if input.has_key('mode') and input.get('mode'):1149 search_type = input.get('mode')1150 else:1151 search_type = session.search_mode1152 if input.has_key('page'):1153 # check if the results_of_get_file already exists1154 # results_of_get_file is the results from the GET method of search class1155 results_of_get_file = CACHE_PATH + '/' + md5_query + '.get'1156 if os.path.exists(results_of_get_file):1157 if time.time() - os.path.getmtime(results_of_get_file) <= RESULTS_FILE_TIME_OUT:1158 f = open(results_of_get_file)1159 stored_results_of_get = pickle.load(f)1160 query_db = stored_results_of_get['query_info']['query_db']1161 if( not query_db in available_dbs ) or query_db != db_name:1162 return json.dumps({'status': 'DB not accessible!'})1163 f.close()1164 else:1165 os.remove(results_of_get_file)1166 else:1167 # post_results_file is the results from the POST method of search class1168 post_results_file = CACHE_PATH + '/' + md5_query + '.post'1169 if os.path.exists(post_results_file):1170 f = open(post_results_file)1171 stored_results_of_post = pickle.load(f)1172 f.close()1173 results_mol_ids = stored_results_of_post['query_results_ids']1174 simi_values = stored_results_of_post['simi_values']1175 query_info = stored_results_of_post['query_info']1176 else:1177 return json.dumps({'status': 'Not yet posted!'})1178 if results_mol_ids:1179 db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])1180 sql_obj = lm.sql(env_db)1181 if not pri_field in part_fields:1182 select_cols = tables[0] + '.' + pri_field + ', ' + ', '.join(fields_to_show_list_part['fields'])1183 else:1184 select_cols = ', '.join(fields_to_show_list_part['fields'])1185 query_string = 'SELECT %s FROM %s WHERE ( %s IN ( %s ) );' % (1186 select_cols,1187 sql_obj.gen_join_part_sql(tables, pri_field),1188 tables[0] + '.' + pri_field, ', '.join(results_mol_ids) )1189 time_before_search = time.time()1190 # this step is always fast, so no lock is set1191 results = db_obj.execute(query_string)1192 time_consumed = time.time() - time_before_search1193 query_info['time_consumed'] += time_consumed1194 db_obj.close()1195 simi_field = env_db['SIMI_FIELD']1196 for r in results:1197 tmpd = {}1198 mol_id = ''1199 if not pri_field in part_fields:1200 tmpd[pri_field] = r[pri_field]1201 mol_id = r[pri_field]1202 for j in fields_to_show_list_part['fields']:1203 j = re.sub(r'^[^\.]+\.', '', j)1204 if j in [ dbd_obj.get_field(k) for k in env_db['COMPRESSED_KEYS'] ]:1205 # uncompress compressed entries1206 if j == dbd_obj.get_field(env_db['2D_STRUC_KEY']):1207 if session.removeh:1208 mol = lm.mymol('mol', zlib.decompress(r[j])).removeh()1209 else:1210 mol = zlib.decompress(r[j])1211 tmpd[j] = mol1212 else:1213 tmpd[j] = zlib.decompress(r[j])1214 elif j == pri_field:1215 mol_id = r[j]1216 tmpd[j] = mol_id1217 else:1218 tmpd[j] = r[j]1219 if mol_id and simi_values and search_type == "3":1220 tmpd[simi_field] = simi_values[mol_id]1221 # l contains the mol info, each mol in a sublist: [ [...], [...] ]1222 results_of_get.append(tmpd)1223 if search_type == '3':1224 table_heads = fields_to_show_list_part['comments'] + ['simi value']1225 part_fields = fields_to_show_list_part['fields'] + [simi_field]1226 part_fields = [ re.sub(r'^[^\.]+\.', '', i) for i in part_fields ]1227 # sort the results on similarity1228 if simi_values:1229 results_of_get.sort( lambda e1, e2: - cmp(e1[simi_field], e2[simi_field]) )1230 for i in results_of_get:1231 i.update({simi_field: str(round(i[simi_field]*100, 2)) + '%'})1232 stored_results_of_get = {1233 'results_of_get': results_of_get,1234 'part_fields': part_fields,1235 'pri_and_struc_fields': {'pri_field': pri_field, 'struc_field': struc_field},1236 'table_heads': table_heads,1237 'query_info': query_info1238 }1239 # store the results1240 f = open(results_of_get_file, 'w')1241 pickle.dump(stored_results_of_get, f)1242 f.close()1243 # results about to display1244 query_info = stored_results_of_get['query_info']1245 db_name = query_info['query_db']1246 query_mols_dic = query_info['query_mols_dic']1247 page = int(input.get('page'))1248 results_of_get = stored_results_of_get['results_of_get']1249 len_results = len(results_of_get)1250 # calculate the page thing1251 pages = ( lambda x, y: x % y and x / y + 1 or x / y ) (len_results, self.results_per_page)1252 show_range_left = self.results_per_page * (page - 1)1253 if show_range_left > len_results:1254 show_range_left = len_results - ( len_results % self.results_per_page )1255 show_range_right = self.results_per_page * page1256 # store the results in a dict1257 results_dic.update({1258 'result_list': results_of_get[show_range_left:show_range_right],1259 'len_results': len_results,1260 'db': db_name,1261 'table_heads': stored_results_of_get['table_heads'],1262 'part_fields': stored_results_of_get['part_fields'],1263 'pri_and_struc_fields': stored_results_of_get['pri_and_struc_fields'],1264 'max_results_num': query_info['max_results_num'],1265 'pages': pages,1266 'page': page,1267 'min_simi': str(round(query_info['min_simi'], 2)),1268 'search_mol': query_info['query_mol'],1269 'query_mols_dic': query_info['query_mols_dic'],1270 'query_id': md5_query,1271 'time_consumed': round(query_info['time_consumed'], 2),1272 'adv_search_query': query_info['adv_search_query'],1273 'results_search_type': query_info['query_mode'],1274 'last_query_id': query_info['last_query_id'],1275 'last_db': query_info['last_db']1276 })1277 except Exception, e:1278 results_dic['info'].append('check your query')1279 results_dic['info'].append(e)1280 return json.dumps(results_dic)1281 for key in ('mode_selected', 'db_selected',1282 'links', 'urls_dic', 'table_heads',1283 'last_db', 'part_fields', 'pri_and_struc_fields',1284 'lang', 'trans_class', 'info_head',1285 'html_title', 'current_page'):1286 self.results_dic.pop(key)1287 return json.dumps(results_dic)1288 def POST(self, name = ''):1289 # search types: 1: exact search, 2: substructure search,1290 # 3: similarity search, 4: advanced search, 5: superstructure search1291 input_raw = web.input()1292 if input_raw.has_key('query_format'):1293 qfmt = input_raw.get('query_format')1294 if qfmt == 'json' and input_raw.has_key('json'):1295 input = json.loads(input_raw.get('json'))1296 else:1297 input = input_raw1298 else:1299 input = input_raw1300 results_dic = tpl_results_dic.copy()1301 # check basic permission1302 if not is_permited(session.userid, session.usergroup, ('s')):1303 return json.dumps({'status': 'Permition denied!'})1304 query_smiles = ''1305 adv_search_query = ''1306 query_smiles_dic = {}1307 query_mols_dic = {}1308 max_results_num_from_query = 01309 # for similarity search1310 min_simi = 01311 simi_values_dic = {}1312 regex0 = ''1313 regex1 = ''1314 regex2 = ''1315 last_mol_ids = []1316 last_query_id = ''1317 last_db = ''1318 available_dbs = PUBLIC_DBS1319 if session.authorized:1320 available_dbs = DATABASES1321 self.results_dic.update({'dbs': available_dbs})1322 try:1323 if input.has_key('results_per_page'):1324 self.results_per_page = int(input.get('results_per_page'))1325 session.results_per_page = int(input.get('results_per_page'))1326 if input.has_key('max_results_num'):1327 self.max_results_num = int(input.get('max_results_num'))1328 session.max_results_num = int(input.get('max_results_num'))1329 if input.has_key('mol'):1330 search_mol = str(input.get('mol'))1331 session.search_mol = str(input.get('mol'))1332 else:1333 search_mol = ''1334 search_type = session.search_mode1335 if input.has_key('mode') and input.get('mode'):1336 if input.get('mode') in SEARCH_MODES:1337 search_type = input.get('mode')1338 session.search_mode = input.get('mode')1339 else:1340 return json.dumps({'status': 'Invalid mode: %s' %input.get('mode')})1341 # chose which database to use1342 if input.has_key('db') and input.get('db'):1343 db_name = input.get('db')1344 if db_name not in available_dbs:1345 return json.dumps({'status': 'DB not accessible!'})1346 session.db = input.get('db')1347 else:1348 db_name = session.db1349 env_db = ENVS[db_name]1350 sql_obj = lm.sql(env_db)1351 dbd_obj = dbd_objs[db_name]1352 tables = dbd_obj.tables1353 pri_field = env_db['PRI_FIELD']1354 smi_field = dbd_obj.get_field(env_db['SMILES_KEY'])1355 simi_field = env_db['SIMI_FIELD']1356 # in advanced mode, there could be more than one smiles and mols separated with "|".1357 if search_type == "4":1358 if input.has_key('query') and input.get('query'):1359 adv_search_query = query_preprocessing(str(input.get('query'))) + ' ' # add a space at the end for regex match1360 # recover escaped ' and ", for ' and " are legal in mode "4"1361 adv_search_query = re.sub(r'\\[\'\"]', '"', adv_search_query)1362 else:1363 return json.dumps({'status': 'Query imcomplete!'})1364 # get the smiless and mols from the input of advanced mode1365 query_smiles_dic = {}1366 query_mols_dic = {}1367 if input.has_key('smiless') and input.get('smiless'):1368 for j in [ i for i in query_preprocessing(str(input.get('smiless'))).split('|') if i and i != '|' ]:1369 tmpl = j.split(':')1370 if len(tmpl) == 2:1371 query_smiles_dic[tmpl[0]] = tmpl[1]1372 elif input.has_key('mols') and input.get('mols'):1373 for j in [ i for i in query_preprocessing(str(input.get('mols'))).split('|') if i and i != '|' ]:1374 tmpl = j.split(':')1375 if len(tmpl) == 2:1376 query_mols_dic[tmpl[0]] = tmpl[1]1377 # store in session1378 if query_smiles_dic:1379 session.query_smiles_dic = query_smiles_dic1380 elif query_mols_dic:1381 for k, v in query_smiles_dic.items():1382 query_smiles_dic[k] = lm.mymol('mol', v).mol.write('smi')1383 if query_smiles_dic:1384 session.query_smiles_dic = query_smiles_dic1385 if query_mols_dic:1386 session.query_mols_dic = query_mols_dic1387 # check if the query legal1388 # first check if there are key words what are not in the abbr_dic1389 regex0 = re.compile(r'([><=!~]+ *[^ )(]+[ )(]*)|([)(])|([sS][uU][bBpP])|([mM][aA][xX])|([aA][nN][dD])|([oO][rR])|([nN][oR][tT])')1390 key_words = []1391 key_words = list(set(regex0.sub(' ', adv_search_query).split()))1392 for k in key_words:1393 if not k in env_db['ABBR_DIC'].keys():1394 return json.dumps({'status': 'Contains illegal words!'})1395 # second check if the mol buffer contains all needed molecules1396 regex1 = re.compile(r'[sS][uU][bBpP] *[!=]+ *[^ )(]*(?=[ )(]+)')1397 mol_key = ''1398 for i in regex1.findall(adv_search_query):1399 mol_key = i.split('=')[-1].strip(' ')1400 if not ( query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key] ):1401 return json.dumps({'status': 'Mol buffer imcomplete!'})1402 # replace some words (~ to like) and abstract the max (limit) value if it has1403 new_adv_search_query = adv_search_query.replace('~', ' LIKE ')1404 regex2 = re.compile(r'[mM][aA][xX] *=+ *[^ )(]*(?=[ )(]+)')1405 tmpl = regex2.findall(new_adv_search_query)1406 if len(tmpl) == 1:1407 try:1408 max_results_num_from_query = int(tmpl[0].split('=')[-1].strip(' '))1409 except:1410 pass1411 new_adv_search_query = regex2.sub('', new_adv_search_query)1412 try:1413 query_sql = sql_obj.gen_adv_search_sql(new_adv_search_query, query_smiles_dic, env_db['ABBR_DIC'])1414 except Exception, e:1415 return json.dumps({'status': 'Query error: %s' %str(e)})1416 elif (input.has_key('mol') and input.get('mol')) or (input.has_key('smiles') and input.get('smiles')):1417 if input.has_key('smiles') and input.get('smiles'):1418 query_smiles = input.get('smiles')1419 else:1420 query_smiles = query_preprocessing(lm.mymol('mol', str(input.get('mol'))).mol.write('smi'))1421 if search_type == "3":1422 if input.has_key('min_simi') and input.get("min_simi"):1423 try:1424 min_simi = float(input.get("min_simi"))1425 except:1426 return json.dumps({'status': 'Min simi contains illegal char!'})1427 else:1428 return json.dumps({'status': 'Query imcomplete!'})1429 query_sql = sql_obj.gen_simi_search_sql(query_smiles, min_simi)1430 else:1431 if search_type == '1':1432 type = '1'1433 elif search_type == '2':1434 type = '2'1435 elif search_type == '5':1436 type = '4'1437 query_sql = sql_obj.gen_search_sql(query_smiles, type)1438 else:1439 return json.dumps({'status': 'Query imcomplete!'})1440 db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])1441 results = []1442 results_tmp = True1443 # search in results1444 if input.has_key('search_in_results') and input.get('search_in_results'):1445 last_results_of_post_file = CACHE_PATH + '/' + input.get('search_in_results') + '.post'1446 if os.path.exists(last_results_of_post_file):1447 f = open(last_results_of_post_file)1448 last_results_of_post = pickle.load(f)1449 f.close()1450 last_mol_ids = last_results_of_post['query_results_ids']1451 last_query_info = last_results_of_post['query_info']1452 last_query_id = last_query_info['query_id']1453 last_db = last_query_info['query_db']1454 # generates the sql string for query1455 if last_mol_ids:1456 query_string = 'SELECT %s, %s %s AND %s IN (%s)' % (1457 tables[0] + '.' + pri_field,1458 smi_field,1459 query_sql,1460 tables[0] + '.' + pri_field,1461 ', '.join(last_mol_ids) )1462 else:1463 query_string = 'SELECT %s, %s %s' % (1464 tables[0] + '.' + pri_field,1465 smi_field,1466 query_sql)1467 # check if there's already a results file1468 md5_query = md5(query_string + db_name + search_type)1469 session.md5_query = md5_query1470 results_of_post_file = CACHE_PATH + '/' + md5_query + '.post'1471 lock_file = results_of_post_file + '.lock'1472 if os.path.exists(results_of_post_file):1473 if time.time() - os.path.getmtime(results_of_post_file) >= RESULTS_FILE_TIME_OUT:1474 os.remove(results_of_post_file)1475 else:1476 return json.dumps({'status': 'OK',1477 'result_url': URLS_DIC['webapi_url'] + '?page=1&db=%s&query_id=%s' %(db_name, md5_query)})1478 # check if the lock file exists, if exists then wait, else continue.1479 while os.path.exists(lock_file):1480 # check if the life span of lock_file reached1481 if time.time() - os.path.getmtime(lock_file) >= LOCK_FILE_LIFE_SPAN:1482 os.remove(lock_file)1483 else:1484 time.sleep(5)1485 # define filters1486 filter = None1487 if search_type == "1":1488 def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):1489 if results_dict.has_key(smiles_field):1490 return mol_obj.gen_openbabel_can_smiles() == results_dict[smiles_field]1491 return False1492 elif search_type == "2":1493 def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):1494 if results_dict.has_key(smiles_field):1495 return mol_obj.sub_match('smi', results_dict[smiles_field])1496 return False1497 elif search_type == "3":1498 # similarity search actually needs no filter.1499 pass1500 elif search_type == '4':1501 sub_smiles = []1502 sup_smiles = []1503 sub_m_objs = []1504 sup_m_objs = []1505 re_sub = re.compile(r'[sS][uU][bB] *[!=]+ *[^ )(]*(?=[ )(]+)')1506 re_sup = re.compile(r'[sS][uU][pP] *[!=]+ *[^ )(]*(?=[ )(]+)')1507 for i in re_sub.findall(adv_search_query):1508 # for querying for a molecule contains no a paticular substructure always1509 # filters out some positive ones, so it's no need to filter here any more,1510 # hence we exclude those '!=' ones here.1511 if re.findall(r'!=', i):1512 continue1513 elif re.findall(r'[^!]=', i):1514 mol_key = i.split('=')[-1].strip(' ')1515 if query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key]:1516 sub_smiles.append(query_smiles_dic[mol_key])1517 for i in re_sup.findall(adv_search_query):1518 # for querying for the superstructure of a paticular molecule always1519 # filters out some positive ones, so it's no need to filter here any more,1520 # hence we exclude those '!=' ones here.1521 if re.findall(r'!=', i):1522 continue1523 elif re.findall(r'[^!]=', i):1524 mol_key = i.split('=')[-1].strip(' ')1525 if query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key]:1526 sup_smiles.append(query_smiles_dic[mol_key])1527 sub_m_objs = [ lm.mymol('smi', m) for m in sub_smiles ]1528 sup_m_objs = [ lm.mymol('smi', m) for m in sup_smiles ]1529 # filter is only needed to define when the m_objs list is not empty.1530 if sub_m_objs or sup_m_objs:1531 def filter(results_dict,1532 sub_mol_objs = sub_m_objs,1533 sup_mol_objs = sup_m_objs,1534 smiles_field = smi_field):1535 if results_dict.has_key(smiles_field):1536 for i in sub_mol_objs:1537 if not i.sub_match('smi', results_dict[smiles_field]):1538 return False1539 for i in sup_mol_objs:1540 if not i.sup_match('smi', results_dict[smiles_field]):1541 return False1542 return True1543 return False1544 elif search_type == '5':1545 def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):1546 if results_dict.has_key(smiles_field):1547 return mol_obj.sup_match('smi', results_dict[smiles_field])1548 return False1549 # limit the results1550 if max_results_num_from_query:1551 num_per_select = 1501552 max_results_num = max_results_num_from_query1553 elif search_type == '3':1554 max_results_num = 101555 else:1556 num_per_select = 1501557 max_results_num = self.max_results_num1558 # search in database and filter the reuslts1559 # record time consuming1560 time_before_search = time.time()1561 if search_type in ('1', '3'):1562 if search_type == '1':1563 limit = '' #' LIMIT 1'1564 elif search_type == '3':1565 limit = ' LIMIT %s ' %(max_results_num,)1566 # set lock to avoid duplocated search1567 open(lock_file, 'w')1568 results = db_obj.execute(query_string + limit + ';')1569 if os.path.exists(lock_file):1570 os.remove(lock_file)1571 db_obj.close()1572 elif search_type in ('2', '4', '5'):1573 # set lock to avoid duplocated search1574 open(lock_file, 'w')1575 results = db_obj.query(1576 query_string,1577 filter,1578 tables[0] + '.' + pri_field,1579 max_results_num,1580 num_per_select)1581 if os.path.exists(lock_file):1582 os.remove(lock_file)1583 db_obj.close()1584 time_consumed = time.time() - time_before_search1585 # preprocessing the results to store1586 # cut of the extra results1587 results = results[:max_results_num]1588 results_dic_to_store = {}1589 mol_id_to_store = []1590 query_info = {}1591 for r in results:1592 if r.has_key(pri_field):1593 id = r[pri_field]1594 mol_id_to_store.append(str(id))1595 if r.has_key(simi_field):1596 simi_values_dic[id] = r[simi_field]1597 query_info = {1598 'query_mols_dic': query_mols_dic,1599 'query_smiles_dic': query_smiles_dic,1600 'query_smiles': query_smiles,1601 'query_mol': search_mol,1602 'query_string': query_string,1603 'query_db': db_name,1604 'max_results_num': max_results_num,1605 'adv_search_query': adv_search_query,1606 'query_mode': search_type,1607 'min_simi': min_simi,1608 'time_consumed': time_consumed,1609 'last_query_id': last_query_id,1610 'last_db': last_db,1611 'query_id': md5_query1612 }1613 results_dic_to_store = {1614 'query_results_ids': mol_id_to_store,1615 'simi_values': simi_values_dic,1616 'query_info': query_info1617 }1618 # store search results1619 f = open(results_of_post_file, 'w')1620 pickle.dump(results_dic_to_store, f)1621 f.close()1622 return json.dumps({'status': 'OK',1623 'result_url': URLS_DIC['webapi_url'] + '?page=1&db=%s&query_id=%s' %(db_name, md5_query)})1624 except Exception, e:1625 return json.dumps({'status': 'Query error: %s' %str(e)})1626class molinfo:1627 def __init__(self):1628 # set the target language to session.lang1629 trans_class.lang = session.lang1630 self.results_dic = tpl_results_dic.copy()1631 self.results_dic.update( {1632 'logged_user': session.nickname,1633 'trans_class': trans_class,1634 'mode_selected': is_selected(str(session.search_mode)),1635 'html_title': 'mol info',1636 'info': [],1637 'show_edit': False,1638 'current_page': 'search',1639 'lang': session.lang1640 } )1641 def GET(self, name = ''):1642 input = web.input()1643 results_dic = self.results_dic1644 available_dbs = PUBLIC_DBS1645 if session.authorized:1646 available_dbs = DATABASES1647 self.results_dic.update({'dbs': available_dbs})1648 # check basic permission1649 if not is_permited(session.userid, session.usergroup, ('s')):1650 return html_no_permision(results_dic)1651 # chose which database to use1652 if input.has_key('db') and input.get('db'):1653 db_name = input.get('db')1654 if db_name not in available_dbs:1655 return html_wrong_db(results_dic)1656 session.db = db_name1657 else:1658 db_name = session.db1659 env_db = ENVS[db_name]1660 sql_obj = lm.sql(env_db)1661 sql_fields_dic = env_db['FIELDS_TO_SHOW_DIC']1662 fields_to_show_list_all = sql_fields_dic['all']1663 fields_to_show_list_part = sql_fields_dic['part']1664 dbd_obj = dbd_objs[db_name]1665 tables = dbd_obj.tables1666 pri_field = env_db['PRI_FIELD']1667 smi_field = dbd_obj.get_field(env_db['SMILES_KEY'])1668 simi_field = env_db['SIMI_FIELD']1669 try:1670 if input.has_key('mol_id'):1671 mol_id = query_preprocessing(str(input.get('mol_id')))1672 if not re.match('^[0-9]+$', mol_id):1673 return html_query_illegal(results_dic)1674 db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])1675 query_string = 'SELECT %s FROM %s WHERE ( %s = %s ) LIMIT 1;' % (1676 ', '.join(fields_to_show_list_all['fields']),1677 sql_obj.gen_join_part_sql(tables, pri_field),1678 tables[0] + '.' + pri_field,1679 mol_id )1680 results = db_obj.execute(query_string)1681 db_obj.close()1682 if not results:1683 results_dic['info'].append('no mol to show')1684 return html_info(results_dic)1685 else:1686 results = results[0]1687 # change the decimal submiter id to nick name1688 submiter_field = dbd_obj.get_field(env_db['SUBMITER_KEY'])1689 if results.has_key(submiter_field):1690 submiter_id = results.get(submiter_field)1691 if not submiter_id:1692 submiter_id = "''"1693 env_dbuser = ENVS[USERSDB]1694 dbuser_obj = lm.database(env_dbuser['HOST'], env_dbuser['USER'], env_dbuser['PASSWORD'], env_dbuser['DBNAME'])1695 results_user = dbuser_obj.execute('SELECT %s FROM %s WHERE (%s = %s) LIMIT 1;' % (1696 env_dbuser['NICK_FIELD'],1697 env_dbuser['USERS_TABLE'],1698 env_dbuser['USER_ID_FIELD'],1699 submiter_id) )1700 dbuser_obj.close()1701 if results_user and results_user[0].has_key(env_dbuser['NICK_FIELD']):1702 results[submiter_field] = results_user[0][env_dbuser['NICK_FIELD']]1703 # uncompress those compressed entries and add comments to the results_dic for display1704 tmpl = []1705 mol = ''1706 fields = fields_to_show_list_all['fields']1707 comments = fields_to_show_list_all['comments']1708 for j in xrange(len(fields)):1709 i = fields[j]1710 i = re.sub(r'^[^\.]+\.', '', i)1711 if i in [ dbd_obj.get_field(k) for k in env_db['COMPRESSED_KEYS'] ]:1712 # uncompress compressed entries1713 if i == dbd_obj.get_field(env_db['2D_STRUC_KEY']):1714 if session.removeh:1715 mol = lm.mymol('mol', zlib.decompress(results[i])).removeh()1716 else:1717 mol = zlib.decompress(results[i])1718 else:1719 tmpl.append([comments[j], zlib.decompress(results[i])])1720 else:1721 tmpl.append([comments[j], results[i]])1722 # show different interface to different groups1723 if db_name in EDITABLE_DBS and session.authorized and \1724 is_permited(session.userid, session.usergroup, ('u'), submiter_id):1725 results_dic['show_edit'] = True1726 results_dic['result_list'] = tmpl1727 results_dic['mol'] = mol1728 results_dic['mol_id'] = mol_id1729 results_dic['db'] = db_name1730 return html_molinfo(results_dic)1731 else:1732 return html_query_imcomplete(results_dic)1733 except Exception, e:1734 results_dic['info'].append('check your query')1735 results_dic['info'].append(e)1736 results_dic['html_title'] = 'query err'1737 return html_info(results_dic)1738class edit:1739 def __init__(self):1740 # set the target language to session.lang1741 trans_class.lang = session.lang1742 self.results_dic = tpl_results_dic.copy()1743 self.results_dic.update( {1744 'logged_user': session.nickname,1745 'trans_class': trans_class,1746 'html_title': 'edit',1747 'dbs': EDITABLE_DBS,1748 'mol_id': None,1749 'show_del': False,1750 'current_page': 'edit',1751 'calculable_keys': [],1752 'submiter_nick': '',1753 'needed_keys': [],1754 'info': [],1755 'lang': session.lang1756 } )1757 def GET(self, name = ''):1758 input = web.input()1759 results_dic = self.results_dic1760 # check basic permission1761 if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s', 'i')) ):1762 return html_no_permision(results_dic)1763 # chose which database to use1764 if input.has_key('db') and input.get('db'):1765 db_name = input.get('db')1766 if ( db_name not in EDITABLE_DBS ) or ( not db_name in EDITABLE_DBS ):1767 return html_wrong_db(results_dic)1768 session.db = db_name1769 else:1770 db_name = EDITABLE_DBS[0]1771 submiter_id = session.userid1772 results_dic = self.results_dic1773 needed_keys = []1774 env_db = ENVS[db_name]1775 dbd_obj = dbd_objs[db_name]1776 sql_obj = lm.sql(env_db)1777 pri_field = env_db['PRI_FIELD']1778 tables = dbd_obj.tables1779 # keys of the molecular properties that can be calculated (with out fingerprints keys)1780 calculable_keys = lm.mymol.mol_data_keys() + lm.mymol.mol_stat_keys()1781 for k in dbd_obj.comments_and_keys_list(1782 [env_db['FP_TABLE']],1783 [env_db['2D_STRUC_KEY'],1784 env_db['SUBMITER_KEY']] + env_db['PRI_KEYS']):1785 if not k[1] in calculable_keys:1786 if not k[0]:1787 k[0] = k[1]1788 needed_keys.append(k)1789 # uniq needed_keys1790 tmpl = []1791 for i in needed_keys:1792 if not i in tmpl:1793 tmpl.append(i)1794 needed_keys = tmpl1795 # edit an exist molecule1796 if input.has_key('mol_id'):1797 mol_id = query_preprocessing(str(input.get('mol_id')))1798 if not re.match('^[0-9]+$', mol_id):1799 return html_query_illegal(results_dic)1800 db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])1801 query_string = 'SELECT * FROM %s WHERE ( %s = %s ) LIMIT 1;' % (1802 sql_obj.gen_join_part_sql(tables, pri_field),1803 tables[0] + '.' + pri_field,1804 mol_id )1805 results = db_obj.execute(query_string)1806 db_obj.close()1807 # check if the molecule exists and if the user has permission to edit it.1808 if results:1809 results = results[0]1810 results_dic['mol_id'] = mol_id1811 submiter_field = dbd_obj.get_field(env_db['SUBMITER_KEY'])1812 if results.has_key(submiter_field):1813 submiter_id = results.get(submiter_field)1814 # check the edit molecule permission (mast has 'u' in his permission list)1815 if not is_permited(session.userid, session.usergroup, ('u'), submiter_id):1816 results_dic['info'] += ['can not edit mol', ': ' + mol_id]1817 results_dic['mol_id'] = ''1818 else:1819 # congrats, you can edit the molecule, and we then prepare the mol info for display.1820 # if you can edit the molecule, then you can also delete it1821 results_dic['show_del'] = True1822 # change the decimal submiter id to nick name1823 env_dbuser = ENVS[USERSDB]1824 dbuser_obj = lm.database(env_dbuser['HOST'], env_dbuser['USER'], env_dbuser['PASSWORD'], env_dbuser['DBNAME'])1825 results_user = dbuser_obj.execute('SELECT %s FROM %s WHERE (%s = %s) LIMIT 1;' % (1826 env_dbuser['NICK_FIELD'],1827 env_dbuser['USERS_TABLE'],1828 env_dbuser['USER_ID_FIELD'],1829 submiter_id) )1830 dbuser_obj.close()1831 if results_user and results_user[0].has_key(env_dbuser['NICK_FIELD']):1832 results_dic['submiter_nick'] = results_user[0][env_dbuser['NICK_FIELD']]1833 # uncompress compressed entries1834 compressed_fields = [ dbd_obj.get_field(k) for k in env_db['COMPRESSED_KEYS'] ]1835 for i in compressed_fields:1836 if i == dbd_obj.get_field(env_db['2D_STRUC_KEY']):1837 if session.removeh:1838 mol = results[i] = lm.mymol('mol', zlib.decompress(results[i])).removeh()1839 else:1840 mol = results[i] = zlib.decompress(results[i])1841 results_dic['mol'] = mol1842 else:1843 results[i] = zlib.decompress(results[i])1844 # store the mol info into needed_keys1845 for i in needed_keys:1846 field = dbd_obj.get_field(i[1])1847 if field:1848 for k, v in results.items():1849 if field == k:1850 i.append(v)1851 continue1852 else:1853 # molecule with the id of the given mol_id dose not exist, change mode to "add mol"1854 results_dic['info'] += ['no mol to show', ': ' + mol_id]1855 results_dic['mol_id'] = ''1856 results_dic['needed_tuple'] = needed_keys1857 results_dic['db'] = db_name1858 results_dic['db_selected'] = is_selected(db_name)1859 return html_edit(results_dic)1860 def POST(self, name = ''):1861 input = web.input()1862 values_dict = input.copy()1863 results_dic = self.results_dic1864 # check basic permission, 'i' permits add new molecule, while 'u' edit an exist molecule.1865 if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s', 'i')) ):1866 return html_no_permision(results_dic)1867 # chose which database to use1868 if input.has_key('db') and input.get('db'):1869 db_name = input.get('db')1870 session.db = db_name1871 else:1872 db_name = EDITABLE_DBS[0]1873 if ( db_name not in EDITABLE_DBS ) or ( not db_name in EDITABLE_DBS ):1874 return html_wrong_db(results_dic)1875 env_db = ENVS[db_name]1876 sql_obj = lm.sql(env_db)1877 dbd_obj = dbd_objs[db_name]1878 db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])1879 pri_field = env_db['PRI_FIELD']1880 tables = dbd_obj.tables1881 mol_id = ''1882 smiles = ''1883 add_mol = False1884 submiter_id = session.userid1885 submiter_field = dbd_obj.get_field(env_db['SUBMITER_KEY'])1886 try:1887 # smiles from the web input is needed1888 if not (input.has_key('SMILES') and input.get('SMILES')):1889 results_dic['info'].append('needed entry imcomplete')1890 results_dic['html_title'] = 'needed entry imcomplete'1891 return html_info(results_dic)1892 else:1893 smiles = query_preprocessing(str(input.get('SMILES')))1894 if input.has_key('mol_id') and input.get('mol_id'):1895 mol_id = query_preprocessing(input.get('mol_id'))1896 if not re.match('^[0-9]+$', mol_id):1897 return html_query_illegal(results_dic)1898 else:1899 try:1900 # the id of new added mol is the max id plus one.1901 mol_id = db_obj.execute('SELECT max(%s) AS max_mol_id FROM %s;' %(pri_field, tables[0]))[0]['max_mol_id']1902 mol_id = str(int(mol_id) + 1)1903 # add new molecule. those has permission of executing 'i' action can do this.1904 add_mol = True1905 except:1906 pass1907 if not (mol_id and smiles):1908 results_dic['info'].append('needed entry imcomplete')1909 results_dic['html_title'] = 'needed entry imcomplete'1910 return html_info(results_dic)1911 else:1912 # check if one has the permission to edit the molecule ( user not in group 1 can only1913 # insert new ones or update those submited by himself)1914 if not add_mol:1915 # if you are here, then you are editing an exist molecule, only those1916 # has the permission of executing 'u' action can do this.1917 # find the submiter of the molecule with the id of mol_id1918 query_string = 'SELECT %s FROM %s WHERE ( %s = %s ) LIMIT 1;' % (1919 submiter_field,1920 dbd_obj.get_table_name(env_db['SUBMITER_KEY']),1921 pri_field,1922 mol_id )1923 results = db_obj.execute(query_string)1924 if results:1925 results = results[0]1926 if results.has_key(submiter_field):1927 submiter_id = results.get(submiter_field)1928 # check the edit exist molecule permission1929 if not is_permited(session.userid, session.usergroup, ('u'), submiter_id):1930 results_dic['html_title'] = 'permission denied'1931 results_dic['info'] += ['<br/>', 'can not edit mol', ': ' + mol_id]1932 return html_info(results_dic)1933 for id_key in env_db['PRI_KEYS']:1934 values_dict[id_key] = mol_id1935 # the submiter id should keep still1936 # caculates the caculable properties1937 tmp_dict = {}1938 mol = lm.mymol('smi', smiles)1939 mol.get_mol_data(tmp_dict)1940 mol.get_fps(tmp_dict)1941 mol.get_mol_stat(tmp_dict)1942 values_dict.update(tmp_dict)1943 # store the new info of the mol1944 for sql_string in sql_obj.gen_insert_sqls(values_dict, 'REPLACE').values():1945 db_obj.execute(sql_string + ';')1946 db_obj.close()1947 results_dic['html_title'] = 'edit finished'1948 results_dic['info'] += ['edit mol finished',1949 '<a href="%s?db=%s&mol_id=%s">%s</a>' %(URLS_DIC['molinfo_url'], db_name, mol_id, mol_id)]1950 return html_info(results_dic)1951 except Exception, e:1952 results_dic['info'].append('check your query')1953 results_dic['info'].append(e)1954 results_dic['html_title'] = 'query err'1955 return html_info(results_dic)1956class delmol:1957 '''1958 this class deletes the molecule from the database1959 '''1960 def __init__(self):1961 trans_class.lang = session.lang1962 self.results_dic = tpl_results_dic.copy()1963 self.results_dic.update( {1964 'logged_user': session.nickname,1965 'trans_class': trans_class,1966 'html_title': 'del mol',1967 'info': [],1968 'current_page': 'search',1969 'lang': session.lang1970 } )1971 def GET(self, name = ''):1972 input = web.input()1973 results_dic = self.results_dic1974 available_dbs = PUBLIC_DBS1975 if session.authorized:1976 available_dbs = DATABASES1977 self.results_dic.update({'dbs': available_dbs})1978 # check basic permission1979 if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s', 'i')) ):1980 return html_no_permision(results_dic)1981 # chose which database to use1982 if input.has_key('db') and input.get('db'):1983 db_name = input.get('db')1984 if ( db_name not in available_dbs ) or ( not db_name in EDITABLE_DBS ):1985 return html_wrong_db(results_dic)1986 session.db = db_name1987 else:1988 db_name = session.db1989 env_db = ENVS[db_name]1990 sql_obj = lm.sql(env_db)1991 dbd_obj = dbd_objs[db_name]1992 pri_field = env_db['PRI_FIELD']1993 tables = dbd_obj.tables1994 submiter_id = session.userid1995 submiter_field = dbd_obj.get_field(env_db['SUBMITER_KEY'])1996 empty_mol_list = []1997 del_failed_mol_list = []1998 del_finished_mol_list = []1999 if input.has_key('mol_id') and input.get('mol_id'):2000 mol_id_list = [ id for id in query_preprocessing(input.get('mol_id')).split('|') if id ]2001 for id in mol_id_list:2002 if not re.match('^[0-9]+$', id):2003 return html_query_illegal(results_dic)2004 db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])2005 for mol_id in mol_id_list:2006 submiter_id = ''2007 # find the submiter of the molecule with the id of mol_id2008 query_string = 'SELECT %s FROM %s WHERE ( %s = %s ) LIMIT 1;' % (2009 submiter_field,2010 dbd_obj.get_table_name(env_db['SUBMITER_KEY']),2011 pri_field,2012 mol_id )2013 results = db_obj.execute(query_string)2014 if results:2015 results = results[0]2016 if results.has_key(submiter_field):2017 submiter_id = results.get(submiter_field)2018 else:2019 empty_mol_list.append(mol_id)2020 continue2021 # check if you can delete a molecule2022 if not is_permited(session.userid, session.usergroup, ('u'), submiter_id):2023 del_failed_mol_list.append(mol_id)2024 continue2025 else:2026 for t in tables:2027 db_obj.execute("DELETE FROM %s WHERE (%s = %s);" %(t, pri_field, mol_id))2028 del_finished_mol_list.append(mol_id)2029 db_obj.close()2030 results_dic['html_title'] = 'del report'2031 if del_failed_mol_list:2032 # report the deleting failed molecules2033 mol_url_list = []2034 for id in del_failed_mol_list:2035 mol_url_list.append('<br/><a href="%s?db=%s&mol_id=%s">%s</a>' %(URLS_DIC['molinfo_url'], db_name, id, id))2036 results_dic['info'] += ['del mol failed', ':', '\n'.join(mol_url_list)]2037 if empty_mol_list:2038 for id in empty_mol_list:2039 results_dic['info'] += ['<br/>', 'empty mol entry', '<br/>' + id]2040 if del_finished_mol_list:2041 mol_url_list = []2042 for id in del_finished_mol_list:2043 results_dic['info'] += ['<br/>', 'del mol', id, 'completed']2044 return html_info(results_dic)2045 else:2046 results_dic['info'].append('mol id not post')2047 results_dic['html_title'] = 'needed entry imcomplete'2048 return html_info(results_dic)2049class mymols:2050 def __init__(self):2051 trans_class.lang = session.lang2052 self.results_dic = tpl_results_dic.copy()2053 self.results_dic.update( {2054 'logged_user': session.nickname,2055 'trans_class': trans_class,2056 'info': [],2057 'lang': session.lang2058 } )2059 def GET(self, name = ''):2060 input = web.input()2061 results_dic = self.results_dic2062 available_dbs = PUBLIC_DBS2063 if session.authorized:2064 available_dbs = DATABASES2065 self.results_dic.update({'dbs': available_dbs})2066 # check basic permission2067 if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s', 'i')) ):2068 return html_no_permision(results_dic)2069 # chose which database to use2070 if input.has_key('db') and input.get('db'):2071 db_name = input.get('db')2072 if db_name not in available_dbs:2073 return html_wrong_db(results_dic)2074 session.db = db_name2075 else:2076 db_name = session.db2077 env_db = ENVS[db_name]2078 submiter_abbr = env_db['SUBMITER_ABBR']2079 return web.seeother('%s?mode=%s&db=%s&prequery=%s' %(2080 URLS_DIC['search_url'], '4', db_name, submiter_abbr + '+%3D+' + str(session.userid)))2081class doc:2082 def __init__(self):2083 # set the target language to session.lang2084 trans_class.lang = session.lang2085 # docments2086 self.results_dic = tpl_results_dic.copy()2087 self.results_dic.update( {2088 'logged_user': session.nickname,2089 'trans_class': trans_class,2090 'html_title': 'mol info',2091 'info': [],2092 'current_page': 'help',2093 'lang': session.lang2094 } )2095 def doc_render(lang, tpl_name, **kwargs):2096 return DOC_RENDER(lang, tpl_name, **kwargs)2097 self.doc_render = doc_render2098 def GET(self, name = ''):2099 input = web.input()2100 results_dic = self.results_dic2101 results_dic['html_title'] = 'help'2102 info_head = ''2103 doc_id = ''2104 # check basic permission2105 if not is_permited(session.userid, session.usergroup, ('s')):2106 return html_no_permision(results_dic)2107 if input.has_key('doc_id') and input.get('doc_id'):2108 doc_id = input.get('doc_id')2109 if doc_id == 'about':2110 results_dic['current_page'] = 'about'2111 results_dic['html_title'] = 'about'2112 f = DOCUMENTS_PATH + '/' + session.lang + '/' + '.html'2113 if not os.path.exists(f):2114 f = DOCUMENTS_PATH + '/' + DEFAULT_LANG + '/' + doc_id + '.html'2115 if os.path.exists(f):2116 info_head = doc_id2117 tmp_line = open(f).readline().rstrip('\n')2118 if re.findall(r'^ *##', tmp_line):2119 info_head = re.sub(r'^ *##', '', tmp_line)2120 results_dic['info_head'] = info_head2121 else:2122 results_dic['info'].append('failed to open document')2123 results_dic['html_title'] = 'query err'2124 return html_info(results_dic)2125 try:2126 return WEB_RENDER('header', results_dic = results_dic) + \2127 WEB_RENDER('info', results_dic = results_dic) + \2128 self.doc_render(session.lang, doc_id) + \2129 WEB_RENDER('footer', results_dic = results_dic)2130 except SyntaxError:2131 results_dic['info'].append('failed to open document')2132 results_dic['html_title'] = 'query err'2133 return html_info(results_dic)2134 except:2135 try:2136 return WEB_RENDER('header', results_dic = results_dic) + \2137 WEB_RENDER('info', results_dic = results_dic) + \2138 self.doc_render(DEFAULT_LANG, doc_id) + \2139 WEB_RENDER('footer', results_dic = results_dic)2140 except AttributeError:2141 results_dic['info'].append('check your query')2142 results_dic['html_title'] = 'query err'2143 return html_info(results_dic)2144 else:2145 results_dic['info_head'] = 'documents'2146 help_urls = []2147 title = ''2148 for i in os.listdir(DOCUMENTS_PATH + '/' + session.lang):2149 try:2150 f = DOCUMENTS_PATH + '/' + session.lang + '/' + i2151 title = re.sub('.html$', '', i)2152 tmp_line = open(f).readline().rstrip('\n')2153 if re.findall(r'^ *##', tmp_line):2154 title = re.sub(r'^ *##', '', tmp_line)2155 help_urls.append((title, URLS_DIC['doc_url'] + '?doc_id=' + re.sub('.html$', '', i)))2156 except:2157 pass2158 return WEB_RENDER('header', results_dic = results_dic) + \2159 WEB_RENDER('info', results_dic = results_dic) + \2160 WEB_RENDER('doc_links', links = help_urls) + \2161 WEB_RENDER('footer', results_dic = results_dic)2162class chsettings:2163 '''change setting such as language, search mode and database'''2164 def GET(self, name = ''):2165 input = web.input()2166 info = ''2167 if input.has_key('lang') and input.get('lang'):2168 lang = input.get('lang')2169 if lang in LANGS:2170 session.lang = lang2171 info += 'change language OK.\n'2172 if input.has_key('mode') and input.get('mode'):2173 mode = input.get('mode')2174 if mode in SEARCH_MODES:2175 session.search_mode = mode2176 info += 'change mode OK.\n'2177 if input.has_key('db') and input.get('db'):2178 db = input.get('db')2179 if db in available_dbs:2180 session.db = db2181 info += 'change database OK.\n'2182 return info2183# run site2184if __name__ == "__main__":...

Full Screen

Full Screen

test_compare_pop_report.py

Source:test_compare_pop_report.py Github

copy

Full Screen

...267 results = get_comparing_populations_report(testParam)268 self.assertEqual(len(results['records']), 4)269 self.assertEqual(results['records'][1]['results']['subject1']['total'], 11)270 self.assertEqual(results['records'][1]['results']['subject2']['total'], -1)271 def test_filters_with_no_results(self):272 testParam = {}273 testParam[Constants.STATECODE] = 'NC'274 testParam[Constants.ASMTYEAR] = 2016275 testParam[filters.FILTERS_PROGRAM_504] = ['NS']276 testParam[filters.FILTERS_PROGRAM_IEP] = ['NS']277 results = get_comparing_populations_report(testParam)278 self.assertEqual(len(results['records']), 2)279 def test_district_view_with_grades(self):280 testParam = {}281 testParam[Constants.STATECODE] = 'NC'282 testParam[Constants.DISTRICTGUID] = '229'283 testParam[Constants.ASMTYEAR] = 2016284 testParam[filters.FILTERS_GRADE] = ['03']285 results = get_comparing_populations_report(testParam)286 self.assertEqual(results['records'][0]['results']['subject1']['total'], 0)287 self.assertEqual(len(results['records']), 1)288 def test_view_with_multi_grades(self):289 testParam = {}290 testParam[Constants.STATECODE] = 'NC'291 testParam[Constants.DISTRICTGUID] = '229'292 testParam[Constants.ASMTYEAR] = 2016293 testParam[filters.FILTERS_GRADE] = ['03', '06', '07', '11']294 results = get_comparing_populations_report(testParam)295 self.assertEqual(results['records'][0]['results']['subject1']['total'], 0)296 self.assertEqual(results['records'][1]['results']['subject1']['total'], 7)297 self.assertEqual(len(results['records']), 3)298 def test_view_with_lep_yes(self):299 testParam = {}300 testParam[Constants.STATECODE] = 'NC'301 testParam[Constants.DISTRICTGUID] = '0513ba44-e8ec-4186-9a0e-8481e9c16206'302 testParam[filters.FILTERS_PROGRAM_LEP] = ['Y']303 testParam[Constants.ASMTYEAR] = 2015304 results = get_comparing_populations_report(testParam)305 self.assertEqual(len(results['records']), 4)306 self.assertEqual(results['records'][0]['results']['subject1']['total'], 3)307 def test_view_with_lep_no(self):308 testParam = {}309 testParam[Constants.STATECODE] = 'NC'310 testParam[Constants.DISTRICTGUID] = '0513ba44-e8ec-4186-9a0e-8481e9c16206'311 testParam[filters.FILTERS_PROGRAM_LEP] = ['N']312 testParam[Constants.ASMTYEAR] = 2015313 results = get_comparing_populations_report(testParam)314 self.assertEqual(len(results['records']), 4)315 self.assertEqual(results['records'][1]['results']['subject1']['total'], 53)316 def test_view_with_lep_multi(self):317 testParam = {}318 testParam[Constants.STATECODE] = 'NC'319 testParam[Constants.DISTRICTGUID] = '0513ba44-e8ec-4186-9a0e-8481e9c16206'320 testParam[filters.FILTERS_PROGRAM_LEP] = ['N', 'Y', 'NS']321 testParam[Constants.ASMTYEAR] = 2015322 results = get_comparing_populations_report(testParam)323 self.assertEqual(len(results['records']), 4)324 self.assertEqual(results['records'][1]['results']['subject1']['total'], 55)325 def test_comparing_populations_min_cell_size(self):326 testParam = {}327 testParam[Constants.STATECODE] = 'NC'328 testParam[Constants.DISTRICTGUID] = '229'329 testParam[filters.FILTERS_ETHNICITY] = [filters.FILTERS_ETHNICITY_HISPANIC]330 # TODO: Fix this when metadata has the correct value set331 # We probably don't need to set the default min cell size after we set a value in csv332# set_default_min_cell_size(5)333# results = get_comparing_populations_report(testParam)334# self.assertEqual(len(results['records']), 3)335# # total must be filtered out336# self.assertEqual(results['records'][0]['results']['subject1']['total'], -1)337# self.assertEqual(results['records'][0]['results']['subject1']['intervals'][0]['percentage'], -1)338# self.assertEqual(results['records'][0]['results']['subject1']['intervals'][1]['percentage'], -1)339# self.assertEqual(results['records'][0]['results']['subject1']['intervals'][2]['percentage'], -1)340# self.assertEqual(results['records'][0]['results']['subject1']['intervals'][3]['percentage'], -1)341# set_default_min_cell_size(0)342 def test_comparing_populations_with_sex(self):343 testParam = {}344 testParam[Constants.STATECODE] = 'NC'345 testParam[Constants.DISTRICTGUID] = '229'346 testParam[Constants.ASMTYEAR] = 2016347 testParam[filters.FILTERS_SEX] = [filters.FILTERS_SEX_MALE]348 results = get_comparing_populations_report(testParam)349 self.assertEqual(len(results['records']), 2)350 self.assertEqual(results['records'][0]['results']['subject1']['total'], -1)351 self.assertEqual(results['records'][0]['results']['subject2']['total'], 5)352 self.assertEqual(results['records'][1]['results']['subject1']['total'], -1)353 self.assertEqual(results['records'][1]['results']['subject2']['total'], 0)354 def test_comparing_populations_with_sex_not_stated(self):355 testParam = {}356 testParam[Constants.STATECODE] = 'NC'357 testParam[Constants.DISTRICTGUID] = '229'358 testParam[filters.FILTERS_SEX] = [filters.FILTERS_SEX_NOT_STATED]359 results = get_comparing_populations_report(testParam)360 self.assertEqual(len(results['records']), 1)361 self.assertEqual(results['records'][0]['results']['subject1']['total'], -1)362 self.assertEqual(results['records'][0]['results']['subject2']['total'], 0)363 def test_comparing_populations_with_not_stated_count(self):364 testParam = {}365 testParam[Constants.STATECODE] = 'NC'366 testParam[Constants.DISTRICTGUID] = '229'367 results = get_comparing_populations_report(testParam)368 self.assertEqual(results['not_stated']['total'], 29)369 self.assertEqual(results['not_stated']['dmgPrg504'], 2)370 self.assertEqual(results['not_stated']['dmgPrgIep'], 2)371 self.assertEqual(results['not_stated']['dmgPrgLep'], 0)372 self.assertEqual(results['not_stated']['dmgStsMig'], 1)373 self.assertEqual(results['not_stated']['ethnicity'], 0)374 self.assertEqual(results['not_stated']['sex'], 1)375 def test_filter_with_unfiltered_results(self):376 testParam = {}377 testParam[Constants.STATECODE] = 'NC'378 testParam[Constants.DISTRICTGUID] = '229'379 testParam[Constants.ASMTYEAR] = 2016380 testParam[filters.FILTERS_SEX] = [filters.FILTERS_SEX_MALE]381 results = get_comparing_populations_report(testParam)382 self.assertEqual(len(results['records']), 2)383 self.assertEqual(results['records'][0]['results']['subject1']['unfilteredTotal'], 9)384 self.assertEqual(results['records'][0]['results']['subject2']['unfilteredTotal'], 9)385 self.assertEqual(results['records'][1]['results']['subject1']['unfilteredTotal'], 3)386 self.assertEqual(results['records'][1]['results']['subject2']['unfilteredTotal'], -1)387 self.assertEqual(results['summary'][0]['results']['subject1']['unfilteredTotal'], 14)388 self.assertEqual(results['summary'][0]['results']['subject2']['unfilteredTotal'], 14)389 def test_get_merged_report_records(self):390 summative = {'records': [{'id': 'a', 'name': 'a', 'type': 'sum',391 'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}},392 {'id': 'b', 'name': 'b', 'type': 'sum',393 'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}}],394 'subjects': {'a': 'a'}, 'summary': [{'results': {'a': {'intervals': [{'percentage': 100}]}}}]}395 interim = {'records': [{'id': 'a', 'name': 'a', 'type': 'int',396 'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}},397 {'id': 'b', 'name': 'b', 'type': 'int',398 'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}}],399 'subjects': {'a': 'a'}}400 results = get_merged_report_records(summative, interim)401 self.assertEqual(len(results), 2)402 self.assertEqual(results[0]['type'], 'sum')403 self.assertEqual(results[0]['name'], 'a')404 self.assertEqual(results[1]['type'], 'sum')405 self.assertEqual(results[1]['name'], 'b')406 def test_get_merged_report_records_with_no_summative(self):407 summative = {'records': [],408 'subjects': {'a': 'a'},409 'summary': [{'results': {'a': {'intervals': [{'percentage': 10}]}}}]}410 interim = {'records': [{'id': 'a', 'name': 'a', 'type': 'int',411 'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}},412 {'id': 'b', 'name': 'b', 'type': 'int',413 'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}}],414 'subjects': {'a': 'a'}}415 results = get_merged_report_records(summative, interim)416 self.assertEqual(len(results), 2)417 self.assertEqual(results[0]['type'], 'int')418 self.assertEqual(results[0]['name'], 'a')419 self.assertEqual(results[1]['type'], 'int')420 self.assertEqual(results[1]['name'], 'b')421 self.assertEqual(results[0]['results']['a']['intervals'][0]['percentage'], -1)422 self.assertEqual(results[0]['results']['a']['intervals'][0]['count'], -1)423 def test_get_merged_report_records_with_mixed_asmt_types(self):424 summative = {'records': [{'id': 'b', 'name': 'b', 'type': 'sum',425 'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}}],426 'subjects': {'a': 'a'},427 'summary': [{'results': {'a': {'intervals': [{'percentage': 100}]}}}]}428 interim = {'records': [{'id': 'a', 'name': 'a', 'type': 'int',429 'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}},430 {'id': 'b', 'name': 'b', 'type': 'int',431 'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}}],432 'subjects': {'a': 'a'}}433 results = get_merged_report_records(summative, interim)434 self.assertEqual(len(results), 2)435 self.assertEqual(results[0]['type'], 'int')436 self.assertEqual(results[0]['name'], 'a')437 self.assertEqual(results[1]['type'], 'sum')438 self.assertEqual(results[1]['name'], 'b')439 def test_get_merged_report_records_with_interim(self):440 summative = {'records': [{'id': 'b', 'name': 'b', 'type': 'sum',441 'results': {'a': {'total': 0, 'intervals': [{'percentage': 0}]}}}],442 'subjects': {'a': 'a'},443 'summary': [{'results': {'a': {'intervals': [{'percentage': 100}]}}}]}444 interim = {'records': [{'id': 'b', 'name': 'b', 'type': 'int',445 'results': {'a': {'total': -1, 'hasInterim': True, 'intervals': [{'percentage': -1}]}}}],446 'subjects': {'a': 'a'}}447 results = get_merged_report_records(summative, interim)448 self.assertEqual(len(results), 1)449 self.assertEqual(results[0]['type'], 'sum')450 self.assertEqual(results[0]['name'], 'b')451 self.assertEqual(results[0]['results']['a']['hasInterim'], True)452 def test_get_merged_report_records_with_no_results(self):453 summative = {'records': [{'id': 'b', 'name': 'b', 'type': 'sum',454 'results': {'a': {'total': 0, 'intervals': [{'percentage': 0}]}}}],455 'subjects': {'a': 'a'},456 'summary': [{'results': {'a': {'intervals': [{'percentage': 100}]}}}]}457 interim = {'records': [{'id': 'b', 'name': 'b', 'type': 'int',458 'results': {'a': {'total': 3, 'hasInterim': True, 'intervals': [{'percentage': 100}]}}}],459 'subjects': {'a': 'a'}}460 results = get_merged_report_records(summative, interim)461 self.assertEqual(len(results), 1)462 self.assertEqual(results[0]['type'], 'sum')463 self.assertEqual(results[0]['name'], 'b')464 self.assertEqual(results[0]['results']['a']['hasInterim'], True)465if __name__ == "__main__":466 # import sys;sys.argv = ['', 'Test.testReport']...

Full Screen

Full Screen

json_results_generator.py

Source:json_results_generator.py Github

copy

Full Screen

1# Copyright 2014 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4#5# Most of this file was ported over from Blink's6# Tools/Scripts/webkitpy/layout_tests/layout_package/json_results_generator.py7# Tools/Scripts/webkitpy/common/net/file_uploader.py8#9import json10import logging11import mimetypes12import os13import time14import urllib215_log = logging.getLogger(__name__)16_JSON_PREFIX = 'ADD_RESULTS('17_JSON_SUFFIX = ');'18def HasJSONWrapper(string):19 return string.startswith(_JSON_PREFIX) and string.endswith(_JSON_SUFFIX)20def StripJSONWrapper(json_content):21 # FIXME: Kill this code once the server returns json instead of jsonp.22 if HasJSONWrapper(json_content):23 return json_content[len(_JSON_PREFIX):len(json_content) - len(_JSON_SUFFIX)]24 return json_content25def WriteJSON(json_object, file_path, callback=None):26 # Specify separators in order to get compact encoding.27 json_string = json.dumps(json_object, separators=(',', ':'))28 if callback:29 json_string = callback + '(' + json_string + ');'30 with open(file_path, 'w') as fp:31 fp.write(json_string)32def ConvertTrieToFlatPaths(trie, prefix=None):33 """Flattens the trie of paths, prepending a prefix to each."""34 result = {}35 for name, data in trie.iteritems():36 if prefix:37 name = prefix + '/' + name38 if len(data) and not 'results' in data:39 result.update(ConvertTrieToFlatPaths(data, name))40 else:41 result[name] = data42 return result43def AddPathToTrie(path, value, trie):44 """Inserts a single path and value into a directory trie structure."""45 if not '/' in path:46 trie[path] = value47 return48 directory, _, rest = path.partition('/')49 if not directory in trie:50 trie[directory] = {}51 AddPathToTrie(rest, value, trie[directory])52def TestTimingsTrie(individual_test_timings):53 """Breaks a test name into dicts by directory54 foo/bar/baz.html: 1ms55 foo/bar/baz1.html: 3ms56 becomes57 foo: {58 bar: {59 baz.html: 1,60 baz1.html: 361 }62 }63 """64 trie = {}65 for test_result in individual_test_timings:66 test = test_result.test_name67 AddPathToTrie(test, int(1000 * test_result.test_run_time), trie)68 return trie69class TestResult(object):70 """A simple class that represents a single test result."""71 # Test modifier constants.72 (NONE, FAILS, FLAKY, DISABLED) = range(4)73 def __init__(self, test, failed=False, elapsed_time=0):74 self.test_name = test75 self.failed = failed76 self.test_run_time = elapsed_time77 test_name = test78 try:79 test_name = test.split('.')[1]80 except IndexError:81 _log.warn('Invalid test name: %s.', test)82 if test_name.startswith('FAILS_'):83 self.modifier = self.FAILS84 elif test_name.startswith('FLAKY_'):85 self.modifier = self.FLAKY86 elif test_name.startswith('DISABLED_'):87 self.modifier = self.DISABLED88 else:89 self.modifier = self.NONE90 def Fixable(self):91 return self.failed or self.modifier == self.DISABLED92class JSONResultsGeneratorBase(object):93 """A JSON results generator for generic tests."""94 MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG = 75095 # Min time (seconds) that will be added to the JSON.96 MIN_TIME = 197 # Note that in non-chromium tests those chars are used to indicate98 # test modifiers (FAILS, FLAKY, etc) but not actual test results.99 PASS_RESULT = 'P'100 SKIP_RESULT = 'X'101 FAIL_RESULT = 'F'102 FLAKY_RESULT = 'L'103 NO_DATA_RESULT = 'N'104 MODIFIER_TO_CHAR = {TestResult.NONE: PASS_RESULT,105 TestResult.DISABLED: SKIP_RESULT,106 TestResult.FAILS: FAIL_RESULT,107 TestResult.FLAKY: FLAKY_RESULT}108 VERSION = 4109 VERSION_KEY = 'version'110 RESULTS = 'results'111 TIMES = 'times'112 BUILD_NUMBERS = 'buildNumbers'113 TIME = 'secondsSinceEpoch'114 TESTS = 'tests'115 FIXABLE_COUNT = 'fixableCount'116 FIXABLE = 'fixableCounts'117 ALL_FIXABLE_COUNT = 'allFixableCount'118 RESULTS_FILENAME = 'results.json'119 TIMES_MS_FILENAME = 'times_ms.json'120 INCREMENTAL_RESULTS_FILENAME = 'incremental_results.json'121 # line too long pylint: disable=line-too-long122 URL_FOR_TEST_LIST_JSON = (123 'http://%s/testfile?builder=%s&name=%s&testlistjson=1&testtype=%s&master=%s')124 # pylint: enable=line-too-long125 def __init__(self, builder_name, build_name, build_number,126 results_file_base_path, builder_base_url,127 test_results_map, svn_repositories=None,128 test_results_server=None,129 test_type='',130 master_name=''):131 """Modifies the results.json file. Grabs it off the archive directory132 if it is not found locally.133 Args134 builder_name: the builder name (e.g. Webkit).135 build_name: the build name (e.g. webkit-rel).136 build_number: the build number.137 results_file_base_path: Absolute path to the directory containing the138 results json file.139 builder_base_url: the URL where we have the archived test results.140 If this is None no archived results will be retrieved.141 test_results_map: A dictionary that maps test_name to TestResult.142 svn_repositories: A (json_field_name, svn_path) pair for SVN143 repositories that tests rely on. The SVN revision will be144 included in the JSON with the given json_field_name.145 test_results_server: server that hosts test results json.146 test_type: test type string (e.g. 'layout-tests').147 master_name: the name of the buildbot master.148 """149 self._builder_name = builder_name150 self._build_name = build_name151 self._build_number = build_number152 self._builder_base_url = builder_base_url153 self._results_directory = results_file_base_path154 self._test_results_map = test_results_map155 self._test_results = test_results_map.values()156 self._svn_repositories = svn_repositories157 if not self._svn_repositories:158 self._svn_repositories = {}159 self._test_results_server = test_results_server160 self._test_type = test_type161 self._master_name = master_name162 self._archived_results = None163 def GenerateJSONOutput(self):164 json_object = self.GetJSON()165 if json_object:166 file_path = (167 os.path.join(168 self._results_directory,169 self.INCREMENTAL_RESULTS_FILENAME))170 WriteJSON(json_object, file_path)171 def GenerateTimesMSFile(self):172 times = TestTimingsTrie(self._test_results_map.values())173 file_path = os.path.join(self._results_directory, self.TIMES_MS_FILENAME)174 WriteJSON(times, file_path)175 def GetJSON(self):176 """Gets the results for the results.json file."""177 results_json = {}178 if not results_json:179 results_json, error = self._GetArchivedJSONResults()180 if error:181 # If there was an error don't write a results.json182 # file at all as it would lose all the information on the183 # bot.184 _log.error('Archive directory is inaccessible. Not '185 'modifying or clobbering the results.json '186 'file: ' + str(error))187 return None188 builder_name = self._builder_name189 if results_json and builder_name not in results_json:190 _log.debug('Builder name (%s) is not in the results.json file.',191 builder_name)192 self._ConvertJSONToCurrentVersion(results_json)193 if builder_name not in results_json:194 results_json[builder_name] = (195 self._CreateResultsForBuilderJSON())196 results_for_builder = results_json[builder_name]197 if builder_name:198 self._InsertGenericMetaData(results_for_builder)199 self._InsertFailureSummaries(results_for_builder)200 # Update the all failing tests with result type and time.201 tests = results_for_builder[self.TESTS]202 all_failing_tests = self._GetFailedTestNames()203 all_failing_tests.update(ConvertTrieToFlatPaths(tests))204 for test in all_failing_tests:205 self._InsertTestTimeAndResult(test, tests)206 return results_json207 def SetArchivedResults(self, archived_results):208 self._archived_results = archived_results209 def UploadJSONFiles(self, json_files):210 """Uploads the given json_files to the test_results_server (if the211 test_results_server is given)."""212 if not self._test_results_server:213 return214 if not self._master_name:215 _log.error(216 '--test-results-server was set, but --master-name was not. Not '217 'uploading JSON files.')218 return219 _log.info('Uploading JSON files for builder: %s', self._builder_name)220 attrs = [('builder', self._builder_name),221 ('testtype', self._test_type),222 ('master', self._master_name)]223 files = [(json_file, os.path.join(self._results_directory, json_file))224 for json_file in json_files]225 url = 'http://%s/testfile/upload' % self._test_results_server226 # Set uploading timeout in case appengine server is having problems.227 # 120 seconds are more than enough to upload test results.228 uploader = _FileUploader(url, 120)229 try:230 response = uploader.UploadAsMultipartFormData(files, attrs)231 if response:232 if response.code == 200:233 _log.info('JSON uploaded.')234 else:235 _log.debug(236 "JSON upload failed, %d: '%s'", response.code, response.read())237 else:238 _log.error('JSON upload failed; no response returned')239 except Exception, err: # pylint: disable=broad-except240 _log.error('Upload failed: %s', err)241 return242 def _GetTestTiming(self, test_name):243 """Returns test timing data (elapsed time) in second244 for the given test_name."""245 if test_name in self._test_results_map:246 # Floor for now to get time in seconds.247 return int(self._test_results_map[test_name].test_run_time)248 return 0249 def _GetFailedTestNames(self):250 """Returns a set of failed test names."""251 return set([r.test_name for r in self._test_results if r.failed])252 def _GetModifierChar(self, test_name):253 """Returns a single char (e.g. SKIP_RESULT, FAIL_RESULT,254 PASS_RESULT, NO_DATA_RESULT, etc) that indicates the test modifier255 for the given test_name.256 """257 if test_name not in self._test_results_map:258 return self.__class__.NO_DATA_RESULT259 test_result = self._test_results_map[test_name]260 if test_result.modifier in self.MODIFIER_TO_CHAR.keys():261 return self.MODIFIER_TO_CHAR[test_result.modifier]262 return self.__class__.PASS_RESULT263 def _get_result_char(self, test_name):264 """Returns a single char (e.g. SKIP_RESULT, FAIL_RESULT,265 PASS_RESULT, NO_DATA_RESULT, etc) that indicates the test result266 for the given test_name.267 """268 if test_name not in self._test_results_map:269 return self.__class__.NO_DATA_RESULT270 test_result = self._test_results_map[test_name]271 if test_result.modifier == TestResult.DISABLED:272 return self.__class__.SKIP_RESULT273 if test_result.failed:274 return self.__class__.FAIL_RESULT275 return self.__class__.PASS_RESULT276 def _GetSVNRevision(self, in_directory):277 """Returns the svn revision for the given directory.278 Args:279 in_directory: The directory where svn is to be run.280 """281 # This is overridden in flakiness_dashboard_results_uploader.py.282 raise NotImplementedError()283 def _GetArchivedJSONResults(self):284 """Download JSON file that only contains test285 name list from test-results server. This is for generating incremental286 JSON so the file generated has info for tests that failed before but287 pass or are skipped from current run.288 Returns (archived_results, error) tuple where error is None if results289 were successfully read.290 """291 results_json = {}292 old_results = None293 error = None294 if not self._test_results_server:295 return {}, None296 results_file_url = (self.URL_FOR_TEST_LIST_JSON %297 (urllib2.quote(self._test_results_server),298 urllib2.quote(self._builder_name),299 self.RESULTS_FILENAME,300 urllib2.quote(self._test_type),301 urllib2.quote(self._master_name)))302 try:303 # FIXME: We should talk to the network via a Host object.304 results_file = urllib2.urlopen(results_file_url)305 old_results = results_file.read()306 except urllib2.HTTPError, http_error:307 # A non-4xx status code means the bot is hosed for some reason308 # and we can't grab the results.json file off of it.309 if http_error.code < 400 and http_error.code >= 500:310 error = http_error311 except urllib2.URLError, url_error:312 error = url_error313 if old_results:314 # Strip the prefix and suffix so we can get the actual JSON object.315 old_results = StripJSONWrapper(old_results)316 try:317 results_json = json.loads(old_results)318 except Exception: # pylint: disable=broad-except319 _log.debug('results.json was not valid JSON. Clobbering.')320 # The JSON file is not valid JSON. Just clobber the results.321 results_json = {}322 else:323 _log.debug('Old JSON results do not exist. Starting fresh.')324 results_json = {}325 return results_json, error326 def _InsertFailureSummaries(self, results_for_builder):327 """Inserts aggregate pass/failure statistics into the JSON.328 This method reads self._test_results and generates329 FIXABLE, FIXABLE_COUNT and ALL_FIXABLE_COUNT entries.330 Args:331 results_for_builder: Dictionary containing the test results for a332 single builder.333 """334 # Insert the number of tests that failed or skipped.335 fixable_count = len([r for r in self._test_results if r.Fixable()])336 self._InsertItemIntoRawList(results_for_builder,337 fixable_count, self.FIXABLE_COUNT)338 # Create a test modifiers (FAILS, FLAKY etc) summary dictionary.339 entry = {}340 for test_name in self._test_results_map.iterkeys():341 result_char = self._GetModifierChar(test_name)342 entry[result_char] = entry.get(result_char, 0) + 1343 # Insert the pass/skip/failure summary dictionary.344 self._InsertItemIntoRawList(results_for_builder, entry,345 self.FIXABLE)346 # Insert the number of all the tests that are supposed to pass.347 all_test_count = len(self._test_results)348 self._InsertItemIntoRawList(results_for_builder,349 all_test_count, self.ALL_FIXABLE_COUNT)350 def _InsertItemIntoRawList(self, results_for_builder, item, key):351 """Inserts the item into the list with the given key in the results for352 this builder. Creates the list if no such list exists.353 Args:354 results_for_builder: Dictionary containing the test results for a355 single builder.356 item: Number or string to insert into the list.357 key: Key in results_for_builder for the list to insert into.358 """359 if key in results_for_builder:360 raw_list = results_for_builder[key]361 else:362 raw_list = []363 raw_list.insert(0, item)364 raw_list = raw_list[:self.MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG]365 results_for_builder[key] = raw_list366 def _InsertItemRunLengthEncoded(self, item, encoded_results):367 """Inserts the item into the run-length encoded results.368 Args:369 item: String or number to insert.370 encoded_results: run-length encoded results. An array of arrays, e.g.371 [[3,'A'],[1,'Q']] encodes AAAQ.372 """373 if len(encoded_results) and item == encoded_results[0][1]:374 num_results = encoded_results[0][0]375 if num_results <= self.MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG:376 encoded_results[0][0] = num_results + 1377 else:378 # Use a list instead of a class for the run-length encoding since379 # we want the serialized form to be concise.380 encoded_results.insert(0, [1, item])381 def _InsertGenericMetaData(self, results_for_builder):382 """ Inserts generic metadata (such as version number, current time etc)383 into the JSON.384 Args:385 results_for_builder: Dictionary containing the test results for386 a single builder.387 """388 self._InsertItemIntoRawList(results_for_builder,389 self._build_number, self.BUILD_NUMBERS)390 # Include SVN revisions for the given repositories.391 for (name, path) in self._svn_repositories:392 # Note: for JSON file's backward-compatibility we use 'chrome' rather393 # than 'chromium' here.394 lowercase_name = name.lower()395 if lowercase_name == 'chromium':396 lowercase_name = 'chrome'397 self._InsertItemIntoRawList(results_for_builder,398 self._GetSVNRevision(path),399 lowercase_name + 'Revision')400 self._InsertItemIntoRawList(results_for_builder,401 int(time.time()),402 self.TIME)403 def _InsertTestTimeAndResult(self, test_name, tests):404 """ Insert a test item with its results to the given tests dictionary.405 Args:406 tests: Dictionary containing test result entries.407 """408 result = self._get_result_char(test_name)409 test_time = self._GetTestTiming(test_name)410 this_test = tests411 for segment in test_name.split('/'):412 if segment not in this_test:413 this_test[segment] = {}414 this_test = this_test[segment]415 if not len(this_test):416 self._PopulateResultsAndTimesJSON(this_test)417 if self.RESULTS in this_test:418 self._InsertItemRunLengthEncoded(result, this_test[self.RESULTS])419 else:420 this_test[self.RESULTS] = [[1, result]]421 if self.TIMES in this_test:422 self._InsertItemRunLengthEncoded(test_time, this_test[self.TIMES])423 else:424 this_test[self.TIMES] = [[1, test_time]]425 def _ConvertJSONToCurrentVersion(self, results_json):426 """If the JSON does not match the current version, converts it to the427 current version and adds in the new version number.428 """429 if self.VERSION_KEY in results_json:430 archive_version = results_json[self.VERSION_KEY]431 if archive_version == self.VERSION:432 return433 else:434 archive_version = 3435 # version 3->4436 if archive_version == 3:437 for results in results_json.values():438 self._ConvertTestsToTrie(results)439 results_json[self.VERSION_KEY] = self.VERSION440 def _ConvertTestsToTrie(self, results):441 if not self.TESTS in results:442 return443 test_results = results[self.TESTS]444 test_results_trie = {}445 for test in test_results.iterkeys():446 single_test_result = test_results[test]447 AddPathToTrie(test, single_test_result, test_results_trie)448 results[self.TESTS] = test_results_trie449 def _PopulateResultsAndTimesJSON(self, results_and_times):450 results_and_times[self.RESULTS] = []451 results_and_times[self.TIMES] = []452 return results_and_times453 def _CreateResultsForBuilderJSON(self):454 results_for_builder = {}455 results_for_builder[self.TESTS] = {}456 return results_for_builder457 def _RemoveItemsOverMaxNumberOfBuilds(self, encoded_list):458 """Removes items from the run-length encoded list after the final459 item that exceeds the max number of builds to track.460 Args:461 encoded_results: run-length encoded results. An array of arrays, e.g.462 [[3,'A'],[1,'Q']] encodes AAAQ.463 """464 num_builds = 0465 index = 0466 for result in encoded_list:467 num_builds = num_builds + result[0]468 index = index + 1469 if num_builds > self.MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG:470 return encoded_list[:index]471 return encoded_list472 def _NormalizeResultsJSON(self, test, test_name, tests):473 """ Prune tests where all runs pass or tests that no longer exist and474 truncate all results to maxNumberOfBuilds.475 Args:476 test: ResultsAndTimes object for this test.477 test_name: Name of the test.478 tests: The JSON object with all the test results for this builder.479 """480 test[self.RESULTS] = self._RemoveItemsOverMaxNumberOfBuilds(481 test[self.RESULTS])482 test[self.TIMES] = self._RemoveItemsOverMaxNumberOfBuilds(483 test[self.TIMES])484 is_all_pass = self._IsResultsAllOfType(test[self.RESULTS],485 self.PASS_RESULT)486 is_all_no_data = self._IsResultsAllOfType(test[self.RESULTS],487 self.NO_DATA_RESULT)488 max_time = max([test_time[1] for test_time in test[self.TIMES]])489 # Remove all passes/no-data from the results to reduce noise and490 # filesize. If a test passes every run, but takes > MIN_TIME to run,491 # don't throw away the data.492 if is_all_no_data or (is_all_pass and max_time <= self.MIN_TIME):493 del tests[test_name]494 # method could be a function pylint: disable=R0201495 def _IsResultsAllOfType(self, results, result_type):496 """Returns whether all the results are of the given type497 (e.g. all passes)."""498 return len(results) == 1 and results[0][1] == result_type499class _FileUploader(object):500 def __init__(self, url, timeout_seconds):501 self._url = url502 self._timeout_seconds = timeout_seconds503 def UploadAsMultipartFormData(self, files, attrs):504 file_objs = []505 for filename, path in files:506 with file(path, 'rb') as fp:507 file_objs.append(('file', filename, fp.read()))508 # FIXME: We should use the same variable names for the formal and actual509 # parameters.510 content_type, data = _EncodeMultipartFormData(attrs, file_objs)511 return self._UploadData(content_type, data)512 def _UploadData(self, content_type, data):513 start = time.time()514 end = start + self._timeout_seconds515 while time.time() < end:516 try:517 request = urllib2.Request(self._url, data,518 {'Content-Type': content_type})519 return urllib2.urlopen(request)520 except urllib2.HTTPError as e:521 _log.warn("Received HTTP status %s loading \"%s\". "522 'Retrying in 10 seconds...', e.code, e.filename)523 time.sleep(10)524def _GetMIMEType(filename):525 return mimetypes.guess_type(filename)[0] or 'application/octet-stream'526# FIXME: Rather than taking tuples, this function should take more527# structured data.528def _EncodeMultipartFormData(fields, files):529 """Encode form fields for multipart/form-data.530 Args:531 fields: A sequence of (name, value) elements for regular form fields.532 files: A sequence of (name, filename, value) elements for data to be533 uploaded as files.534 Returns:535 (content_type, body) ready for httplib.HTTP instance.536 Source:537 http://code.google.com/p/rietveld/source/browse/trunk/upload.py538 """539 BOUNDARY = '-M-A-G-I-C---B-O-U-N-D-A-R-Y-'540 CRLF = '\r\n'541 lines = []542 for key, value in fields:543 lines.append('--' + BOUNDARY)544 lines.append('Content-Disposition: form-data; name="%s"' % key)545 lines.append('')546 if isinstance(value, unicode):547 value = value.encode('utf-8')548 lines.append(value)549 for key, filename, value in files:550 lines.append('--' + BOUNDARY)551 lines.append('Content-Disposition: form-data; name="%s"; '552 'filename="%s"' % (key, filename))553 lines.append('Content-Type: %s' % _GetMIMEType(filename))554 lines.append('')555 if isinstance(value, unicode):556 value = value.encode('utf-8')557 lines.append(value)558 lines.append('--' + BOUNDARY + '--')559 lines.append('')560 body = CRLF.join(lines)561 content_type = 'multipart/form-data; boundary=%s' % BOUNDARY...

Full Screen

Full Screen

page_test_results_unittest.py

Source:page_test_results_unittest.py Github

copy

Full Screen

1# Copyright 2014 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4import os5import unittest6from telemetry import benchmark7from telemetry import story8from telemetry.internal.results import base_test_results_unittest9from telemetry.internal.results import chart_json_output_formatter10from telemetry.internal.results import json_output_formatter11from telemetry.internal.results import page_test_results12from telemetry import page as page_module13from telemetry.testing import stream14from telemetry.value import failure15from telemetry.value import histogram16from telemetry.value import improvement_direction17from telemetry.value import scalar18from telemetry.value import skip19from telemetry.value import trace20from tracing.trace_data import trace_data21class PageTestResultsTest(base_test_results_unittest.BaseTestResultsUnittest):22 def setUp(self):23 story_set = story.StorySet(base_dir=os.path.dirname(__file__))24 story_set.AddStory(page_module.Page("http://www.bar.com/", story_set, story_set.base_dir))25 story_set.AddStory(page_module.Page("http://www.baz.com/", story_set, story_set.base_dir))26 story_set.AddStory(page_module.Page("http://www.foo.com/", story_set, story_set.base_dir))27 self.story_set = story_set28 @property29 def pages(self):30 return self.story_set.stories31 def testFailures(self):32 results = page_test_results.PageTestResults()33 results.WillRunPage(self.pages[0])34 results.AddValue(35 failure.FailureValue(self.pages[0], self.CreateException()))36 results.DidRunPage(self.pages[0])37 results.WillRunPage(self.pages[1])38 results.DidRunPage(self.pages[1])39 self.assertEqual(set([self.pages[0]]), results.pages_that_failed)40 self.assertEqual(set([self.pages[1]]), results.pages_that_succeeded)41 self.assertEqual(2, len(results.all_page_runs))42 self.assertTrue(results.all_page_runs[0].failed)43 self.assertTrue(results.all_page_runs[1].ok)44 def testSkips(self):45 results = page_test_results.PageTestResults()46 results.WillRunPage(self.pages[0])47 results.AddValue(skip.SkipValue(self.pages[0], 'testing reason'))48 results.DidRunPage(self.pages[0])49 results.WillRunPage(self.pages[1])50 results.DidRunPage(self.pages[1])51 self.assertTrue(results.all_page_runs[0].skipped)52 self.assertEqual(self.pages[0], results.all_page_runs[0].story)53 self.assertEqual(set([self.pages[0], self.pages[1]]),54 results.pages_that_succeeded)55 self.assertEqual(2, len(results.all_page_runs))56 self.assertTrue(results.all_page_runs[0].skipped)57 self.assertTrue(results.all_page_runs[1].ok)58 def testBasic(self):59 results = page_test_results.PageTestResults()60 results.WillRunPage(self.pages[0])61 results.AddValue(scalar.ScalarValue(62 self.pages[0], 'a', 'seconds', 3,63 improvement_direction=improvement_direction.UP))64 results.DidRunPage(self.pages[0])65 results.WillRunPage(self.pages[1])66 results.AddValue(scalar.ScalarValue(67 self.pages[1], 'a', 'seconds', 3,68 improvement_direction=improvement_direction.UP))69 results.DidRunPage(self.pages[1])70 results.PrintSummary()71 values = results.FindPageSpecificValuesForPage(self.pages[0], 'a')72 self.assertEquals(1, len(values))73 v = values[0]74 self.assertEquals(v.name, 'a')75 self.assertEquals(v.page, self.pages[0])76 values = results.FindAllPageSpecificValuesNamed('a')77 assert len(values) == 278 def testAddValueWithStoryGroupingKeys(self):79 results = page_test_results.PageTestResults()80 self.pages[0].grouping_keys['foo'] = 'bar'81 self.pages[0].grouping_keys['answer'] = '42'82 results.WillRunPage(self.pages[0])83 results.AddValue(scalar.ScalarValue(84 self.pages[0], 'a', 'seconds', 3,85 improvement_direction=improvement_direction.UP))86 results.DidRunPage(self.pages[0])87 results.PrintSummary()88 values = results.FindPageSpecificValuesForPage(self.pages[0], 'a')89 v = values[0]90 self.assertEquals(v.grouping_keys['foo'], 'bar')91 self.assertEquals(v.grouping_keys['answer'], '42')92 self.assertEquals(v.tir_label, '42_bar')93 def testAddValueWithStoryGroupingKeysAndMatchingTirLabel(self):94 results = page_test_results.PageTestResults()95 self.pages[0].grouping_keys['foo'] = 'bar'96 self.pages[0].grouping_keys['answer'] = '42'97 results.WillRunPage(self.pages[0])98 results.AddValue(scalar.ScalarValue(99 self.pages[0], 'a', 'seconds', 3,100 improvement_direction=improvement_direction.UP,101 tir_label='42_bar'))102 results.DidRunPage(self.pages[0])103 results.PrintSummary()104 values = results.FindPageSpecificValuesForPage(self.pages[0], 'a')105 v = values[0]106 self.assertEquals(v.grouping_keys['foo'], 'bar')107 self.assertEquals(v.grouping_keys['answer'], '42')108 self.assertEquals(v.tir_label, '42_bar')109 def testAddValueWithStoryGroupingKeysAndMismatchingTirLabel(self):110 results = page_test_results.PageTestResults()111 self.pages[0].grouping_keys['foo'] = 'bar'112 self.pages[0].grouping_keys['answer'] = '42'113 results.WillRunPage(self.pages[0])114 with self.assertRaises(AssertionError):115 results.AddValue(scalar.ScalarValue(116 self.pages[0], 'a', 'seconds', 3,117 improvement_direction=improvement_direction.UP,118 tir_label='another_label'))119 def testAddValueWithDuplicateStoryGroupingKeyFails(self):120 results = page_test_results.PageTestResults()121 self.pages[0].grouping_keys['foo'] = 'bar'122 results.WillRunPage(self.pages[0])123 with self.assertRaises(AssertionError):124 results.AddValue(scalar.ScalarValue(125 self.pages[0], 'a', 'seconds', 3,126 improvement_direction=improvement_direction.UP,127 grouping_keys={'foo': 'bar'}))128 def testUrlIsInvalidValue(self):129 results = page_test_results.PageTestResults()130 results.WillRunPage(self.pages[0])131 self.assertRaises(132 AssertionError,133 lambda: results.AddValue(scalar.ScalarValue(134 self.pages[0], 'url', 'string', 'foo',135 improvement_direction=improvement_direction.UP)))136 def testAddSummaryValueWithPageSpecified(self):137 results = page_test_results.PageTestResults()138 results.WillRunPage(self.pages[0])139 self.assertRaises(140 AssertionError,141 lambda: results.AddSummaryValue(scalar.ScalarValue(142 self.pages[0], 'a', 'units', 3,143 improvement_direction=improvement_direction.UP)))144 def testUnitChange(self):145 results = page_test_results.PageTestResults()146 results.WillRunPage(self.pages[0])147 results.AddValue(scalar.ScalarValue(148 self.pages[0], 'a', 'seconds', 3,149 improvement_direction=improvement_direction.UP))150 results.DidRunPage(self.pages[0])151 results.WillRunPage(self.pages[1])152 self.assertRaises(153 AssertionError,154 lambda: results.AddValue(scalar.ScalarValue(155 self.pages[1], 'a', 'foobgrobbers', 3,156 improvement_direction=improvement_direction.UP)))157 def testTypeChange(self):158 results = page_test_results.PageTestResults()159 results.WillRunPage(self.pages[0])160 results.AddValue(scalar.ScalarValue(161 self.pages[0], 'a', 'seconds', 3,162 improvement_direction=improvement_direction.UP))163 results.DidRunPage(self.pages[0])164 results.WillRunPage(self.pages[1])165 self.assertRaises(166 AssertionError,167 lambda: results.AddValue(histogram.HistogramValue(168 self.pages[1], 'a', 'seconds',169 raw_value_json='{"buckets": [{"low": 1, "high": 2, "count": 1}]}',170 improvement_direction=improvement_direction.UP)))171 def testGetPagesThatSucceededAllPagesFail(self):172 results = page_test_results.PageTestResults()173 results.WillRunPage(self.pages[0])174 results.AddValue(scalar.ScalarValue(175 self.pages[0], 'a', 'seconds', 3,176 improvement_direction=improvement_direction.UP))177 results.AddValue(failure.FailureValue.FromMessage(self.pages[0], 'message'))178 results.DidRunPage(self.pages[0])179 results.WillRunPage(self.pages[1])180 results.AddValue(scalar.ScalarValue(181 self.pages[1], 'a', 'seconds', 7,182 improvement_direction=improvement_direction.UP))183 results.AddValue(failure.FailureValue.FromMessage(self.pages[1], 'message'))184 results.DidRunPage(self.pages[1])185 results.PrintSummary()186 self.assertEquals(0, len(results.pages_that_succeeded))187 def testGetSuccessfulPageValuesMergedNoFailures(self):188 results = page_test_results.PageTestResults()189 results.WillRunPage(self.pages[0])190 results.AddValue(scalar.ScalarValue(191 self.pages[0], 'a', 'seconds', 3,192 improvement_direction=improvement_direction.UP))193 self.assertEquals(1, len(results.all_page_specific_values))194 results.DidRunPage(self.pages[0])195 def testGetAllValuesForSuccessfulPages(self):196 results = page_test_results.PageTestResults()197 results.WillRunPage(self.pages[0])198 value1 = scalar.ScalarValue(199 self.pages[0], 'a', 'seconds', 3,200 improvement_direction=improvement_direction.UP)201 results.AddValue(value1)202 results.DidRunPage(self.pages[0])203 results.WillRunPage(self.pages[1])204 value2 = scalar.ScalarValue(205 self.pages[1], 'a', 'seconds', 3,206 improvement_direction=improvement_direction.UP)207 results.AddValue(value2)208 results.DidRunPage(self.pages[1])209 results.WillRunPage(self.pages[2])210 value3 = scalar.ScalarValue(211 self.pages[2], 'a', 'seconds', 3,212 improvement_direction=improvement_direction.UP)213 results.AddValue(value3)214 results.DidRunPage(self.pages[2])215 self.assertEquals(216 [value1, value2, value3], results.all_page_specific_values)217 def testGetAllValuesForSuccessfulPagesOnePageFails(self):218 results = page_test_results.PageTestResults()219 results.WillRunPage(self.pages[0])220 value1 = scalar.ScalarValue(221 self.pages[0], 'a', 'seconds', 3,222 improvement_direction=improvement_direction.UP)223 results.AddValue(value1)224 results.DidRunPage(self.pages[0])225 results.WillRunPage(self.pages[1])226 value2 = failure.FailureValue.FromMessage(self.pages[1], 'Failure')227 results.AddValue(value2)228 results.DidRunPage(self.pages[1])229 results.WillRunPage(self.pages[2])230 value3 = scalar.ScalarValue(231 self.pages[2], 'a', 'seconds', 3,232 improvement_direction=improvement_direction.UP)233 results.AddValue(value3)234 results.DidRunPage(self.pages[2])235 self.assertEquals(236 [value1, value2, value3], results.all_page_specific_values)237 def testFindValues(self):238 results = page_test_results.PageTestResults()239 results.WillRunPage(self.pages[0])240 v0 = scalar.ScalarValue(241 self.pages[0], 'a', 'seconds', 3,242 improvement_direction=improvement_direction.UP)243 results.AddValue(v0)244 v1 = scalar.ScalarValue(245 self.pages[0], 'a', 'seconds', 4,246 improvement_direction=improvement_direction.UP)247 results.AddValue(v1)248 results.DidRunPage(self.pages[1])249 values = results.FindValues(lambda v: v.value == 3)250 self.assertEquals([v0], values)251 def testValueWithTIRLabel(self):252 results = page_test_results.PageTestResults()253 results.WillRunPage(self.pages[0])254 v0 = scalar.ScalarValue(255 self.pages[0], 'a', 'seconds', 3, tir_label='foo',256 improvement_direction=improvement_direction.UP)257 results.AddValue(v0)258 v1 = scalar.ScalarValue(259 self.pages[0], 'a', 'seconds', 3, tir_label='bar',260 improvement_direction=improvement_direction.UP)261 results.AddValue(v1)262 results.DidRunPage(self.pages[0])263 values = results.FindAllPageSpecificValuesFromIRNamed('foo', 'a')264 self.assertEquals([v0], values)265 def testTraceValue(self):266 results = page_test_results.PageTestResults()267 results.WillRunPage(self.pages[0])268 results.AddValue(trace.TraceValue(269 None, trace_data.CreateTraceDataFromRawData([[{'test': 1}]])))270 results.DidRunPage(self.pages[0])271 results.WillRunPage(self.pages[1])272 results.AddValue(trace.TraceValue(273 None, trace_data.CreateTraceDataFromRawData([[{'test': 2}]])))274 results.DidRunPage(self.pages[1])275 results.PrintSummary()276 values = results.FindAllTraceValues()277 self.assertEquals(2, len(values))278 def testCleanUpCleansUpTraceValues(self):279 results = page_test_results.PageTestResults()280 v0 = trace.TraceValue(281 None, trace_data.CreateTraceDataFromRawData([{'test': 1}]))282 v1 = trace.TraceValue(283 None, trace_data.CreateTraceDataFromRawData([{'test': 2}]))284 results.WillRunPage(self.pages[0])285 results.AddValue(v0)286 results.DidRunPage(self.pages[0])287 results.WillRunPage(self.pages[1])288 results.AddValue(v1)289 results.DidRunPage(self.pages[1])290 results.CleanUp()291 self.assertTrue(v0.cleaned_up)292 self.assertTrue(v1.cleaned_up)293 def testNoTracesLeftAfterCleanUp(self):294 results = page_test_results.PageTestResults()295 v0 = trace.TraceValue(None,296 trace_data.CreateTraceDataFromRawData([{'test': 1}]))297 v1 = trace.TraceValue(None,298 trace_data.CreateTraceDataFromRawData([{'test': 2}]))299 results.WillRunPage(self.pages[0])300 results.AddValue(v0)301 results.DidRunPage(self.pages[0])302 results.WillRunPage(self.pages[1])303 results.AddValue(v1)304 results.DidRunPage(self.pages[1])305 results.CleanUp()306 self.assertFalse(results.FindAllTraceValues())307 def testPrintSummaryDisabledResults(self):308 output_stream = stream.TestOutputStream()309 output_formatters = []310 benchmark_metadata = benchmark.BenchmarkMetadata(311 'benchmark_name', 'benchmark_description')312 output_formatters.append(313 chart_json_output_formatter.ChartJsonOutputFormatter(314 output_stream, benchmark_metadata))315 output_formatters.append(json_output_formatter.JsonOutputFormatter(316 output_stream, benchmark_metadata))317 results = page_test_results.PageTestResults(318 output_formatters=output_formatters, benchmark_enabled=False)319 results.PrintSummary()320 self.assertEquals(output_stream.output_data,321 "{\n \"enabled\": false,\n \"benchmark_name\": \"benchmark_name\"\n}\n")322class PageTestResultsFilterTest(unittest.TestCase):323 def setUp(self):324 story_set = story.StorySet(base_dir=os.path.dirname(__file__))325 story_set.AddStory(326 page_module.Page('http://www.foo.com/', story_set, story_set.base_dir))327 story_set.AddStory(328 page_module.Page('http://www.bar.com/', story_set, story_set.base_dir))329 self.story_set = story_set330 @property331 def pages(self):332 return self.story_set.stories333 def testFilterValue(self):334 def AcceptValueNamed_a(value, _):335 return value.name == 'a'336 results = page_test_results.PageTestResults(337 value_can_be_added_predicate=AcceptValueNamed_a)338 results.WillRunPage(self.pages[0])339 results.AddValue(scalar.ScalarValue(340 self.pages[0], 'a', 'seconds', 3,341 improvement_direction=improvement_direction.UP))342 results.AddValue(scalar.ScalarValue(343 self.pages[0], 'b', 'seconds', 3,344 improvement_direction=improvement_direction.UP))345 results.DidRunPage(self.pages[0])346 results.WillRunPage(self.pages[1])347 results.AddValue(scalar.ScalarValue(348 self.pages[1], 'a', 'seconds', 3,349 improvement_direction=improvement_direction.UP))350 results.AddValue(scalar.ScalarValue(351 self.pages[1], 'd', 'seconds', 3,352 improvement_direction=improvement_direction.UP))353 results.DidRunPage(self.pages[1])354 results.PrintSummary()355 self.assertEquals(356 [('a', 'http://www.foo.com/'), ('a', 'http://www.bar.com/')],357 [(v.name, v.page.url) for v in results.all_page_specific_values])358 def testFilterIsFirstResult(self):359 def AcceptSecondValues(_, is_first_result):360 return not is_first_result361 results = page_test_results.PageTestResults(362 value_can_be_added_predicate=AcceptSecondValues)363 # First results (filtered out)364 results.WillRunPage(self.pages[0])365 results.AddValue(scalar.ScalarValue(366 self.pages[0], 'a', 'seconds', 7,367 improvement_direction=improvement_direction.UP))368 results.AddValue(scalar.ScalarValue(369 self.pages[0], 'b', 'seconds', 8,370 improvement_direction=improvement_direction.UP))371 results.DidRunPage(self.pages[0])372 results.WillRunPage(self.pages[1])373 results.AddValue(scalar.ScalarValue(374 self.pages[1], 'a', 'seconds', 5,375 improvement_direction=improvement_direction.UP))376 results.AddValue(scalar.ScalarValue(377 self.pages[1], 'd', 'seconds', 6,378 improvement_direction=improvement_direction.UP))379 results.DidRunPage(self.pages[1])380 # Second results381 results.WillRunPage(self.pages[0])382 results.AddValue(scalar.ScalarValue(383 self.pages[0], 'a', 'seconds', 3,384 improvement_direction=improvement_direction.UP))385 results.AddValue(scalar.ScalarValue(386 self.pages[0], 'b', 'seconds', 4,387 improvement_direction=improvement_direction.UP))388 results.DidRunPage(self.pages[0])389 results.WillRunPage(self.pages[1])390 results.AddValue(scalar.ScalarValue(391 self.pages[1], 'a', 'seconds', 1,392 improvement_direction=improvement_direction.UP))393 results.AddValue(scalar.ScalarValue(394 self.pages[1], 'd', 'seconds', 2,395 improvement_direction=improvement_direction.UP))396 results.DidRunPage(self.pages[1])397 results.PrintSummary()398 expected_values = [399 ('a', 'http://www.foo.com/', 3),400 ('b', 'http://www.foo.com/', 4),401 ('a', 'http://www.bar.com/', 1),402 ('d', 'http://www.bar.com/', 2)]403 actual_values = [(v.name, v.page.url, v.value)404 for v in results.all_page_specific_values]405 self.assertEquals(expected_values, actual_values)406 def testFailureValueCannotBeFiltered(self):407 def AcceptValueNamed_a(value, _):408 return value.name == 'a'409 results = page_test_results.PageTestResults(410 value_can_be_added_predicate=AcceptValueNamed_a)411 results.WillRunPage(self.pages[0])412 results.AddValue(scalar.ScalarValue(413 self.pages[0], 'b', 'seconds', 8,414 improvement_direction=improvement_direction.UP))415 failure_value = failure.FailureValue.FromMessage(self.pages[0], 'failure')416 results.AddValue(failure_value)417 results.DidRunPage(self.pages[0])418 results.PrintSummary()419 # Although predicate says only accept values named 'a', the failure value is420 # added anyway.421 self.assertEquals(len(results.all_page_specific_values), 1)422 self.assertIn(failure_value, results.all_page_specific_values)423 def testSkipValueCannotBeFiltered(self):424 def AcceptValueNamed_a(value, _):425 return value.name == 'a'426 results = page_test_results.PageTestResults(427 value_can_be_added_predicate=AcceptValueNamed_a)428 results.WillRunPage(self.pages[0])429 skip_value = skip.SkipValue(self.pages[0], 'skip for testing')430 results.AddValue(scalar.ScalarValue(431 self.pages[0], 'b', 'seconds', 8,432 improvement_direction=improvement_direction.UP))433 results.AddValue(skip_value)434 results.DidRunPage(self.pages[0])435 results.PrintSummary()436 # Although predicate says only accept value with named 'a', skip value is437 # added anyway.438 self.assertEquals(len(results.all_page_specific_values), 1)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful