How to use sql method in dbt-osmosis

Best Python code snippet using dbt-osmosis_python

app.py

Source:app.py Github

copy

Full Screen

...17 return 'Welcome to MyCode!'18@app.route('/login', methods=['GET', 'POST'])19def do_login():20 res = make_response()21 dosql = open_sql('mycode')22 if request.method == 'GET':23 try:24 username = request.args.get('username')25 password = request.args.get('password')26 results = do_sql(dosql, f"call doLoginProcedure('{username}','{password}')")27 close_sql(dosql)28 result = simplejson.loads(results)[0]29 if result.get('code') is not None:30 if result.get('code') == '0404':31 res.status_code = '1404'32 if result.get('code') == '0403':33 res.status_code = '1403'34 res.data = results35 except:36 res.status_code = '1500'37 return res38 if request.method == 'POST':39 res.status_code = '1500'40 return res41@app.route('/product/')42def product_info():43 dosql = open_sql('mysales')44 if request.method == 'GET':45 productid = request.args.get('productid')46 if (productid == None):47 sql = 'select * from products'48 else:49 sql = f'select * from products where productid = {productid}'50 results = do_sql(dosql, sql)51 close_sql(dosql)52 return results53 elif request.method == 'POST':54 return '暂不支持数据库查询以外的操作'55@app.route('/user/message')56def user_message_info():57 if request.method == 'GET':58 dosql = open_sql('mycode')59 useridList = request.args.get('useridList').split(',')60 myid = request.args.get('userid')61 userMessage = []62 for userid in useridList:63 sql = f"call getUserMessageInfo('{userid}','{myid}');"64 result = simplejson.loads(do_sql(dosql, sql))[0]65 userMessage.append(result)66 close_sql(dosql)67 return simplejson.dumps(userMessage)68@app.route('/user/message/detail')69def user_message_detail():70 if request.method == 'GET':71 dosql = open_sql('mycode')72 userid = request.args.get('userid')73 id = request.args.get('id')74 sql = f"call update_messages('{userid}','{id}');"75 dosql = update_sql(dosql, sql)76 sql = f"call get_before_messages('{userid}','{id}');"77 messages = simplejson.loads(do_sql(dosql, sql))78 messages = messages[::-1]79 sql = f"call get_message_detail('{userid}','{id}');"80 result = simplejson.loads(do_sql(dosql, sql))[0]81 result["messages"] = messages82 close_sql(dosql)83 return simplejson.dumps([result])84@app.route('/problem/type/admin', methods=['GET', 'POST'])85def problem_type_admin():86 dosql = open_sql('mycode')87 if request.method == 'GET':88 sql = 'call select_problemtypes_admin();'89 results = do_sql(dosql, sql)90 close_sql(dosql)91 return results92 elif request.method == 'POST':93 params = request.json.get('params')94 sql = 'call empty_problemtypes();'95 dosql = update_sql(dosql, sql)96 for node in params.get('data'):97 parentid = node.get('parentid')98 if parentid != None:99 sql = f"insert into problemtypes(typeid,text,level,parentid)value('{node.get('typeid')}','{node.get('text')}',{node.get('level')},'{parentid}');"100 else:101 sql = f"insert into problemtypes(typeid,text,level,parentid)value('{node.get('typeid')}','{node.get('text')}',{node.get('level')},null);"102 dosql = update_sql(dosql, sql)103 close_sql(dosql)104 return '暂不支持数据库查询以外的操作'105@app.route('/problem/run', methods=['GET', 'POST'])106def problem_run():107 res = make_response()108 if request.method == 'POST':109 params = request.json.get('params')110 userid = params.get('userid')111 problemid = params.get('problemid')112 type = params.get('typeid')113 userid = params.get('userid')114 rightOutput = params.get('output')115 language = params.get('language')116 submitid = str(random.randint(1111111111, 9999999999))117 func = params.get('func')118 code = params.get('code')119 input = params.get('input')120 if language == 2:121 myoutput, isError = do_python(func, code, input);122 elif language == 3:123 myoutput, isError = do_js(func, code, input, type);124 results = simplejson.loads(simplejson.dumps({}))125 results['data'] = str(myoutput)126 results['isError'] = isError127 res.headers['Content-Type'] = 'application/json; charset=utf-8'128 results['data'] = str(myoutput)129 results['isError'] = isError130 results = simplejson.dumps([results])131 res.data = results132 dosql = open_sql('mycode')133 status = 0134 if str(myoutput) == str(rightOutput):135 status = 1136 else:137 status = 2138 if isError:139 status = 3140 sql = f"call add_code_submit('{problemid}','{submitid}','{userid}',{language},{status});"141 dosql = update_sql(dosql, sql)142 close_sql(dosql)143 return res144@app.route('/problem/runJS', methods=['GET', 'POST'])145def problem_run_js():146 res = make_response()147 if request.method == 'POST':148 params = request.json.get('params')149 dosql = open_sql('mycode')150 sql = f"call get_problem_info('{params.get('problemid')}');"151 problem_info = simplejson.loads(do_sql(dosql, sql))[0]152 input = problem_info.get('input').split('\n')[0]153 output = problem_info.get('output').split('\n')[0]154 code = params.get('code')155 print(code)156 myoutput, isError = do_js(problem_info.get('func'), code, input, problem_info.get('type'));157 results = simplejson.loads(simplejson.dumps({}))158 results['output'] = myoutput159 results['isError'] = isError160 print(results)161 results = simplejson.dumps([results])162 res.data = results163 return res164@app.route('/problem/runPy', methods=['GET', 'POST'])165def problem_run_python():166 res = make_response()167 if request.method == 'POST':168 params = request.json.get('params')169 func = params.get('func')170 input = params.get('input')171 code = params.get('code')172 output, isError = do_python(func, code, input);173 results = simplejson.loads(simplejson.dumps({}))174 results['data'] = output175 results['isError'] = isError176 results = simplejson.dumps([results])177 res.data = results178 return res179@app.route('/problem/runSQL', methods=['GET', 'POST'])180def problem_run_mysql():181 res = make_response()182 if request.method == 'POST':183 params = request.json.get('params')184 code = params.get('code')185 dosql = open_sql('mysales')186 results = do_sql(dosql, code)187 close_sql(dosql)188 res.data = results189 return res190@app.route('/problem/submit/note', methods=['GET', 'POST'])191def problem_submit_note():192 res = make_response()193 if request.method == 'POST':194 params = request.json.get('params')195 sql = f"call problem_submit_note({params.get('submitid')},'{params.get('problemid')}','{params.get('userid')}','{params.get('note')}');"196 dosql = open_sql('mycode')197 dosql = update_sql(dosql, sql)198 close_sql(dosql)199 return res200@app.route('/problems/admin', methods=['GET', 'POST'])201def get_problems_admin():202 res = make_response()203 dosql = open_sql('mycode')204 if request.method == 'GET':205 isparams = 0206 userid = request.args.get('userid')207 if userid == None or userid == '':208 userid = 0209 if True:210 problemname = request.args.get('problemname')211 if problemname == None:212 problemname = 'all'213 type = request.args.get('type')214 if type == None:215 type = '0'216 rank = request.args.get('rank')217 if rank == None:218 rank = 0219 labels = request.args.get('labels')220 if labels == None:221 labels = ''222 status = request.args.get('status')223 if status == None:224 status = 0225 sql = f"call select_problems_admin({userid},1,'{problemname}','{type}',{rank},'{labels}',{status});"226 results = do_sql(dosql, sql)227 close_sql(dosql)228 res.data = results229 return res230@app.route('/problems', methods=['GET', 'POST'])231def get_problems():232 res = make_response()233 dosql = open_sql('mycode')234 if request.method == 'GET':235 isparams = 0236 userid = request.args.get('userid')237 if userid == None or userid == '':238 userid = 0239 if True:240 problemname = request.args.get('problemname')241 if problemname == None:242 problemname = 'all'243 type = request.args.get('type')244 if type == None:245 type = '0'246 rank = request.args.get('rank')247 if rank == None:248 rank = 0249 labels = request.args.get('labels')250 if labels == None:251 labels = ''252 status = request.args.get('status')253 if status == None:254 status = 0255 page = request.args.get('pageNo')256 if page == None:257 page = 1258 sql = f"call select_problems({userid},1,'{problemname}','{type}',{rank},'{labels}',{status},{page});"259 result = simplejson.loads(do_sql(dosql, sql))260 results = simplejson.loads(simplejson.dumps({}))261 # results["results"] = result262 results["total"] = len(result)263 page = int(page) - 1264 count = page * 8265 if len(result) < count + 8:266 result = result[count:]267 else:268 result = result[count:count + 8]269 results["results"] = result270 results = simplejson.dumps([results])271 close_sql(dosql)272 res.data = results273 return res274@app.route('/problem/', methods=['GET', 'POST'])275def get_problem():276 res = make_response()277 dosql = open_sql('mycode')278 if request.method == 'GET':279 problemid = request.args.get('problemid')280 sql = f"call problem_detail(\'{problemid}\')"281 results = do_sql(dosql, sql)282 close_sql(dosql)283 res.data = results284 if request.method == 'POST':285 params = simplejson.loads(request.values.get('params'))286 input = params.get('input')287 if input != None:288 input = input.replace("\'", "\\\'")289 output = params.get('output')290 if output != None:291 output = output.replace("\'", "\\\'")292 if params.get('problemid') == None:293 params["problemid"] = str(random.randint(1111111111, 9999999999))294 sql = f"call add_problem('{params.get('problemid')}','{params.get('title')}','{params.get('msg')}','{input}','{output}',{params.get('rankid')},'{params.get('labels')}','{params.get('typeid')}','{params.get('func')}','{params.get('arguements')}','{params.get('template')}');"295 else:296 sql = f"call edit_problem('{params.get('problemid')}','{params.get('title')}','{params.get('msg')}','{input}','{output}',{params.get('rankid')},'{params.get('labels')}','{params.get('typeid')}','{params.get('func')}','{params.get('arguements')}','{params.get('template')}');"297 print(sql)298 dosql = update_sql(dosql, sql)299 close_sql(dosql)300 return res301@app.route('/problem/submit', methods=['GET', 'POST'])302def problem_submit():303 res = make_response()304 dosql = open_sql('mycode')305 if request.method == 'GET':306 problemid = request.args.get('problemid')307 userid = request.args.get('userid')308 sql = f"call get_code_submit('{problemid}','{userid}')"309 results = do_sql(dosql, sql)310 close_sql(dosql)311 res.data = results312 if request.method == 'POST':313 params = request.json.get('params')314 dosql = open_sql('mycode')315 problemid = params.get('problemid')316 language = params.get('language')317 userid = params.get('userid')318 sql = f"call get_problem_info('{params.get('problemid')}');"319 problem_info = simplejson.loads(do_sql(dosql, sql))[0]320 input = problem_info.get('input').split('\n')321 func = problem_info.get('func')322 type = problem_info.get('typeid')323 if type != 'C':324 output = problem_info.get('output').split('\n')325 code = params.get('code')326 sum = len(input)327 count = 0328 wrong_output = ''329 wrong_input = ''330 except_output = ''331 for index in range(0, len(input)):332 if int(language) == 3:333 result, isError = do_js(func, code, input[index], type)334 elif int(language) == 2:335 result, isError = do_py(func, code, input[index])336 print(result)337 if result == output[index]:338 count += 1339 else:340 wrong_input = input[index]341 except_output = output[index]342 wrong_output = result343 break344 if count == sum:345 status = 1346 else:347 status = 2348 codes = params.get('code').replace('\'', '\\\'');349 sql = f"call add_code_submit('{problemid}','{userid}','{codes}',{language},{status})"350 dosql = update_sql(dosql, sql)351 close_sql(dosql)352 result = simplejson.loads(simplejson.dumps({}))353 result["except"] = count354 result["input"] = wrong_input355 result["output"] = except_output356 result["sum"] = sum357 result["result"] = wrong_output358 result["status"] = status359 result = simplejson.dumps([result])360 res.data = result361 else:362 template = problem_info.get('template')363 problem_sql = open_sql('mysales')364 result1 = simplejson.loads(do_sql(problem_sql, template))365 try:366 result2 = simplejson.loads(do_sql(problem_sql, params.get('code')))367 is_equals = is_rs_eauals(result1, result2)368 except Exception as e:369 result2 = str(e).replace('mysales', 'fake_database')370 is_equals = False371 close_sql(problem_sql)372 result = simplejson.loads(simplejson.dumps({}))373 result["except"] = 1 if is_equals else 0374 result["input"] = template375 result["output"] = simplejson.dumps(result1)376 result["sum"] = 1377 try:378 result["result"] = simplejson.dumps(result2)379 except:380 result["result"] = result2381 result["status"] = 1 if is_equals else 2382 result = simplejson.dumps([result])383 res.data = result384 sql = f"call add_code_submit('{problemid}','{userid}','{params.get('code')}',{language},{1 if is_equals else 2})"385 dosql = update_sql(dosql, sql)386 close_sql(dosql)387 close_sql(dosql)388 return res389@app.route('/plan/problem/submit', methods=['GET', 'POST'])390def plan_problem_submit():391 res = make_response()392 dosql = open_sql('mycode')393 if request.method == 'GET':394 problemid = request.args.get('problemid')395 userid = request.args.get('userid')396 sql = f"call get_code_submit('{problemid}','{userid}')"397 results = do_sql(dosql, sql)398 close_sql(dosql)399 res.data = results400 if request.method == 'POST':401 params = request.json.get('params')402 dosql = open_sql('mycode')403 problemid = params.get('problemid')404 planid = params.get('planid')405 language = params.get('language')406 userid = params.get('userid')407 sql = f"call get_problem_info('{params.get('problemid')}');"408 problem_info = simplejson.loads(do_sql(dosql, sql))[0]409 input = problem_info.get('input').split('\n')410 func = problem_info.get('func')411 type = problem_info.get('type')412 output = problem_info.get('output').split('\n')413 code = params.get('code')414 sum = len(input)415 count = 0416 wrong_output = ''417 wrong_input = ''418 except_output = ''419 for index in range(0, len(input)):420 result, isError = do_js(func, code, input[index], type)421 if result == output[index]:422 count += 1423 else:424 wrong_input = input[index]425 except_output = output[index]426 wrong_output = result427 break428 if count == sum:429 status = 1430 else:431 status = 2432 codes = params.get('code').replace('\'', '\\\'');433 sql = f"call add_code_submit('{problemid}','{userid}','{codes}',{language},{status})"434 dosql = update_sql(dosql, sql)435 # 此处进行是否完成436 if status == 1:437 sql = f"call add_plan_user_problem('{planid}','{userid}','{problemid}');"438 print(sql)439 dosql = update_sql(dosql, sql)440 close_sql(dosql)441 result = simplejson.loads(simplejson.dumps({}))442 result["except"] = count443 result["input"] = wrong_input444 result["output"] = except_output445 result["sum"] = sum446 result["result"] = wrong_output447 result["status"] = status448 print(result)449 result = simplejson.dumps([result])450 res.data = result451 return res452@app.route('/problem/labels', methods=['GET', 'POST'])453def get_problem_labels():454 res = make_response()455 dosql = open_sql('mycode')456 if request.method == 'GET':457 sql = f"call get_problem_labels();"458 results = do_sql(dosql, sql)459 close_sql(dosql)460 res.data = results461 return res462@app.route('/learn/plan', methods=['GET', 'POST'])463def get_learn_plan():464 res = make_response()465 dosql = open_sql('mycode')466 if request.method == 'GET':467 sql = f"call get_learn_plan();"468 results = do_sql(dosql, sql)469 res.data = results470 if request.method == 'POST':471 params = request.json.get('params')472 planid = str(random.randint(1111111111, 9999999999))473 planname = params.get('planname')474 msg = params.get('msg')475 labels = params.get('labels')476 partList = params.get('partList')477 problemList = params.get('problemList')478 sql = f"call add_learn_plan('{planid}','{planname}','{msg}','{labels}');"479 dosql = update_sql(dosql, sql)480 for part in partList:481 sql = f"call add_plan_part('{planid}',{part.get('partid')},'{part.get('partname')}','{part.get('msg')}');"482 print(sql)483 dosql = update_sql(dosql, sql)484 for problem in problemList:485 sql = f"call add_plan_problem('{planid}','{problem.get('problemid')}',{problem.get('part')},{problem.get('points')},{problem.get('needpoints')});"486 dosql = update_sql(dosql, sql)487 close_sql(dosql)488 return res489@app.route('/learn/plan/mine', methods=['GET', 'POST'])490def get_my_plan():491 res = make_response()492 dosql = open_sql('mycode')493 if request.method == 'GET':494 userid = request.args.get('userid')495 sql = f"call get_user_plan('{userid}');"496 results = do_sql(dosql, sql)497 res.data = results498 if request.method == 'POST':499 params = request.json.get('params')500 userid = params.get('userid')501 planid = params.get('planid')502 sql = f"call add_user_plan('{userid}','{planid}');"503 print(sql)504 results = do_sql(dosql, sql)505 res.data = results506 close_sql(dosql)507 return res508@app.route('/learn/plan/problems', methods=['GET', 'POST'])509def get_plan_problems():510 res = make_response()511 dosql = open_sql('mycode')512 if request.method == 'GET':513 sql = f"call get_plan_problems_admin();"514 results = do_sql(dosql, sql)515 close_sql(dosql)516 res.data = results517 return res518@app.route('/learn/plan/detail', methods=['GET', 'POST'])519def get_plan_detail():520 res = make_response()521 dosql = open_sql('mycode')522 if request.method == 'GET':523 results = simplejson.loads(simplejson.dumps({}))524 planid = request.args.get('planid')525 userid = request.args.get('userid')526 sql = f"call get_plan_detail('{planid}','{userid}');"527 detail = simplejson.loads(do_sql(dosql, sql))[0]528 labelid_list = detail.get('labels').split(',')529 labels = ''530 for labelid in labelid_list:531 sql = f"call get_problem_label('{labelid}');"532 labels += (',' + simplejson.loads(do_sql(dosql, sql))[0].get('text'))533 if labels != '':534 labels = labels[1:]535 detail["labels"] = labels536 results["detail"] = detail537 sql = f"call get_plan_problems('{planid}','{userid}');"538 results["problemList"] = simplejson.loads(do_sql(dosql, sql))539 sql = f"call get_plan_parts('{planid}');"540 results["partList"] = simplejson.loads(do_sql(dosql, sql))541 results = simplejson.dumps([results])542 res.data = results543 close_sql(dosql)544 return res545@app.route('/problems/total', methods=['GET', 'POST'])546def get_problem_total():547 res = make_response()548 dosql = open_sql('mycode')549 if request.method == 'GET':550 sql = f"call get_problem_labels();"551 results = do_sql(dosql, sql)552 close_sql(dosql)553 res.data = results554 return res555@app.route('/circle/join', methods=['GET', 'POST'])556def circle_join():557 res = make_response()558 dosql = open_sql('mycode')559 if request.method == 'POST':560 params = request.json.get('params')561 circleid = params.get('id')562 userid = params.get('userid')563 sql = f"call user_circles('{circleid}','{userid}');"564 dosql = update_sql(dosql, sql)565 close_sql(dosql)566 return res567@app.route('/problem/delete', methods=['GET', 'POST'])568def delete_problem():569 res = make_response()570 dosql = open_sql('mycode')571 if request.method == 'GET':572 problemid = request.args.get('problemid')573 sql = f"call delete_problem({problemid});"574 dosql = update_sql(dosql, sql)575 close_sql(dosql)576 return res577@app.route('/problem/types', methods=['GET', 'POST'])578def get_problem_types():579 res = make_response()580 dosql = open_sql('mycode')581 if request.method == 'GET':582 sql = f"call get_problem_types()"583 results = do_sql(dosql, sql)584 close_sql(dosql)585 res.data = results586 return res587@app.route('/circle', methods=['GET', 'POST'])588def get_circle():589 res = make_response()590 dosql = open_sql('mycode')591 if request.method == 'GET':592 id = request.args.get('id')593 userid = request.args.get('userid')594 if id == None:595 id = '0'596 if userid == None:597 userid = '0'598 sql = f"call get_circles('{id}','{userid}');"599 results = do_sql(dosql, sql)600 close_sql(dosql)601 res.data = results602 if request.method == 'POST':603 params = request.json.get('params')604 print(params)605 userid = params.get('userid')606 msg = params.get('msg')607 circlename = params.get('circlename')608 ispublic = params.get('ispublic')609 parentid = params.get('parentid')610 circleid = str(random.randint(1111111111, 9999999999))611 if parentid == 'root':612 sql = f"call add_circles('{circleid}','{circlename}','{msg}',null,1,'{userid}',0,{ispublic});"613 else:614 sql = f"call add_circles('{circleid}','{circlename}','{msg}','{parentid}',0,'{userid}',0,{ispublic});"615 print(sql)616 dosql = update_sql(dosql, sql)617 close_sql(dosql)618 return res619@app.route('/forums', methods=['GET', 'POST'])620def get_forums():621 res = make_response()622 dosql = open_sql('mycode')623 if request.method == 'GET':624 circleid = request.args.get('circleid')625 sql = f"call get_forums('{circleid}');"626 results = simplejson.loads(do_sql(dosql, sql))627 if len(results) != 0:628 for result in results:629 result_labels = result.get('labels').split(',')630 labels = []631 for label in result_labels:632 if label != '':633 text = simplejson.loads(do_sql(dosql, f"call get_circle_label('{circleid}','{label}');"))634 text = text[0].get('text')635 labels.append(text)636 result['labels'] = ','.join(labels)637 res.data = simplejson.dumps(results)638 print(results)639 close_sql(dosql)640 return res641@app.route('/forum', methods=['GET', 'POST'])642def get_forum():643 res = make_response()644 dosql = open_sql('mycode')645 if request.method == 'GET':646 forumid = request.args.get('forumid')647 sql = f"call get_forum('{forumid}');"648 results = do_sql(dosql, sql)649 close_sql(dosql)650 res.data = results651 return res652@app.route('/forum/comment', methods=['GET', 'POST'])653def get_forum_comment():654 res = make_response()655 dosql = open_sql('mycode')656 if request.method == 'GET':657 forumid = request.args.get('forumid')658 sql = f"call get_forum_comment('{forumid}');"659 results = do_sql(dosql, sql)660 close_sql(dosql)661 res.data = results662 return res663@app.route('/forum/comments', methods=['GET', 'POST'])664def get_forum_comments():665 res = make_response()666 dosql = open_sql('mycode')667 if request.method == 'GET':668 commentid = request.args.get('commentid')669 sql = f"call get_forum_comment_comment('{commentid}');"670 results = do_sql(dosql, sql)671 close_sql(dosql)672 res.data = results673 return res674@app.route('/problems/types', methods=['GET', 'POST'])675def problems_types():676 res = make_response()677 dosql = open_sql('mycode')678 if request.method == 'GET':679 typeid = request.args.get('typeid')680 if (typeid == None):681 typeid = '0'682 sql = f"call select_problemtypes('{typeid}')"683 results = do_sql(dosql, sql)684 close_sql(dosql)685 res.data = results686 return res687@app.route('/problems/labels', methods=['GET', 'POST'])688def problems_labels():689 res = make_response()690 dosql = open_sql('mycode')691 if request.method == 'GET':692 sql = f"select * from problemlabels;"693 results = do_sql(dosql, sql)694 close_sql(dosql)695 res.data = results696 return res697@app.route('/reply/comment/', methods=['GET', 'POST'])698def reply_comments():699 dosql = open_sql('mycode')700 if request.method == 'GET':701 replyid = request.args.get('replyid')702 if (replyid == None):703 problemid = request.args.get('problemid')704 sql = f"call problem_replys('{problemid}');"705 else:706 sql = f"call reply_comments('{replyid}');"707 results = do_sql(dosql, sql)708 close_sql(dosql)709 return results710 if request.method == 'POST':711 res = make_response()712 params = request.json.get('params')713 replyid = params.get('replyid')714 replyuserid = params.get('replyuserid')715 userid = params.get('userid')716 commentid = str(random.randint(1111111111, 9999999999))717 content = params.get('content')718 sql = f"call add_reply_comment('{commentid}','{replyid}','{userid}','{replyuserid}','{content}');"719 update_sql(dosql, sql)720 close_sql(dosql)721 return res722@app.route('/circle/labels', methods=['GET', 'POST'])723def circle_labels():724 dosql = open_sql('mycode')725 if request.method == 'GET':726 res = make_response()727 circleid = request.args.get('circleid')728 sql = f"call get_circle_labels('{circleid}');"729 print(sql)730 results = do_sql(dosql, sql)731 close_sql(dosql)732 res.data = results733 return res734@app.route('/circle/label/forum', methods=['GET', 'POST'])735def label_forums():736 dosql = open_sql('mycode')737 if request.method == 'GET':738 res = make_response()739 circleid = request.args.get('circleid')740 labelid = request.args.get('labelid')741 sql = f"call get_labels_forums('{circleid}','{labelid}');"742 results = do_sql(dosql, sql)743 close_sql(dosql)744 res.data = results745 return res746@app.route('/forumorcomment', methods=['GET', 'POST'])747def forum_comment():748 dosql = open_sql('mycode')749 if request.method == 'GET':750 res = make_response()751 id = request.args.get('id')752 type = request.args.get('type')753 if type == 'forum':754 sql = f"call get_forum_info('{id}')"755 if type == 'comment':756 sql = f"call get_comment_info('{id}')"757 results = do_sql(dosql, sql)758 close_sql(dosql)759 res.data = results760 return res761@app.route('/problem/solution/', methods=['GET', 'POST'])762def problem_solution():763 dosql = open_sql('mycode')764 if request.method == 'GET':765 res = make_response()766 solutionid = request.args.get('solutionid')767 sql = f"call get_solution('{solutionid}');"768 results = simplejson.loads(do_sql(dosql, sql))769 result = results[0]770 labels = result['labels']771 try:772 labels = labels['labels'].split(';')773 labels = str(get_label(dosql, labels))774 result['labels'] = labels775 except:776 result['labels'] = ''777 results = [result]778 res.data = simplejson.dumps(results)779 return res780@app.route('/circle/forum', methods=['GET', 'POST'])781def circle_forum_nopublic():782 res = make_response()783 dosql = open_sql('mycode')784 if request.method == 'POST':785 params = request.values786 forumid = str(random.randint(1111111111, 9999999999))787 circleid = params.get('circleid')788 userid = params.get('userid')789 content = params.get('content')790 title = params.get('title')791 sql = f"call add_forum_nopublic('{circleid}','{forumid}','{userid}','{content}','{title}');"792 dosql = update_sql(dosql, sql)793 close_sql(dosql)794 return res795@app.route('/circle/forum/admin', methods=['GET', 'POST'])796def circle_forum_admin():797 res = make_response()798 dosql = open_sql('mycode')799 if request.method == 'GET':800 sql = "call get_forum_nopublic();"801 results = do_sql(dosql, sql)802 res.data = results803 close_sql(dosql)804 return res805@app.route('/forum/admin/unpass', methods=['GET', 'POST'])806def circle_forum_admin_un():807 res = make_response()808 dosql = open_sql('mycode')809 if request.method == 'POST':810 id = request.json.get('params').get('forumid')811 sql = f"delete from nopublicforums where forumid = '{id}';"812 print(sql)813 dosql = update_sql(dosql, sql)814 close_sql(dosql)815 return res816@app.route('/circle/forum/public', methods=['GET', 'POST'])817def circle_forum():818 res = make_response()819 dosql = open_sql('mycode')820 if request.method == 'POST':821 params = request.json.get('params')822 forumid = params.get('id')823 sql = f"select * from nopublicforums where forumid = '{forumid}';"824 forum_detail = simplejson.loads(do_sql(dosql, sql))[0]825 circleid = forum_detail.get('circleid')826 userid = forum_detail.get('userid')827 content = forum_detail.get('content')828 title = forum_detail.get('title')829 labels = content_to_labels(title, content)830 circle_labels = simplejson.loads(do_sql(dosql, f"call get_circle_labels('{circleid}');"))831 labels = concat_label(circle_labels, labels)832 result_labels = []833 for label in labels:834 if type(label) == str:835 labelid = str(random.randint(1111111111, 9999999999))836 dosql = update_sql(dosql, f"call add_circle_label('{circleid}','{labelid}','{label}');")837 result_labels.append(labelid)838 elif type(label) == dict:839 result_labels.append(label.get('id'))840 sql = f"call add_forum('{circleid}','{forumid}','{userid}','{content}','{title}','{','.join(result_labels)}');"841 dosql = update_sql(dosql, sql)842 return res843@app.route('/problem/solutions/', methods=['GET', 'POST'])844def problem_solutions():845 dosql = open_sql('mycode')846 if request.method == 'GET':847 res = make_response()848 problemid = request.args.get('problemid')849 sql = f"call get_solutions('{problemid}');"850 results = simplejson.loads(do_sql(dosql, sql))851 for result in results:852 labels = result['labels'].split(';')853 try:854 labels = str(get_label(dosql, labels))855 result['labels'] = labels856 except:857 result['labels'] = ''858 res.data = simplejson.dumps(results)859 close_sql(dosql)860 return res861 if request.method == 'POST':862 res = make_response()863 params = request.values864 dosql = open_sql('mycode')865 solutionid = str(random.randint(1111111111, 9999999999))866 problemid = params.get('problemid')867 userid = params.get('problemid')868 isofficial = params.get('isofficial')869 language = params.get('language')870 title = params.get('title')871 if title == None:872 title = '暂无标题'873 content = params.get('content')874 labels = params.get('labels')875 sql = f"call add_problem_solution('{solutionid}','{problemid}','{isofficial}','{userid}','{title}','{language}','{content}','','{labels}')"876 dosql = update_sql(dosql, sql)877 close_sql(dosql)878 return res879@app.route('/solution/label/', methods=['GET', 'POST'])880def solution_label():881 dosql = open_sql('mycode')882 if request.method == 'GET':883 res = make_response()884 sql = f"call get_solution_labels();"885 results = do_sql(dosql, sql)886 close_sql(dosql)887 res.data = results888 return res889@app.route('/label/solution/', methods=['GET', 'POST'])890def label_solution():891 dosql = open_sql('mycode')892 if request.method == 'GET':893 res = make_response()894 language = request.args.get('language')895 problemid = request.args.get('problemid')896 if (language == None):897 language = 0898 sql = f"call get_label_solutions('{problemid}','{language}',0);"899 results = do_sql(dosql, sql)900 res.data = results901 labels = request.args.get('labels')902 if (labels != ''):903 labels_id = []904 results = simplejson.loads(results)905 for item in results:906 labels_id.append(item['solutionid'])907 labels = labels[1:]908 label_list = labels.split(',')909 for label in label_list:910 sql = f"call get_label_solutions('{problemid}','{language}',{label});"911 result = simplejson.loads(do_sql(dosql, sql))912 label_id = []913 for item in result:914 label_id.append(item['solutionid'])915 labels_id = list(set(labels_id).intersection(set(label_id)))916 new_results = []917 for result in results:918 if (result['solutionid'] in labels_id):919 labels = result['labels'].split(';')920 labels = str(get_label(dosql, labels))921 result['labels'] = labels922 new_results.append(result)923 new_results = simplejson.dumps(new_results)924 res.data = new_results925 close_sql(dosql)926 return res927if __name__ == '__main__':...

Full Screen

Full Screen

test_sql.py

Source:test_sql.py Github

copy

Full Screen

1#!/usr/bin/env python2# test_sql.py - tests for the psycopg2.sql module3#4# Copyright (C) 2016 Daniele Varrazzo <daniele.varrazzo@gmail.com>5#6# psycopg2 is free software: you can redistribute it and/or modify it7# under the terms of the GNU Lesser General Public License as published8# by the Free Software Foundation, either version 3 of the License, or9# (at your option) any later version.10#11# In addition, as a special exception, the copyright holders give12# permission to link this program with the OpenSSL library (or with13# modified versions of OpenSSL that use the same license as OpenSSL),14# and distribute linked combinations including the two.15#16# You must obey the GNU Lesser General Public License in all respects for17# all of the code used other than OpenSSL.18#19# psycopg2 is distributed in the hope that it will be useful, but WITHOUT20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or21# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public22# License for more details.23import datetime as dt24from io import StringIO25from .testutils import (unittest, ConnectingTestCase,26 skip_before_postgres, skip_before_python, skip_copy_if_green)27import psycopg228from psycopg2 import sql29class SqlFormatTests(ConnectingTestCase):30 @skip_before_python(2, 7)31 def test_pos(self):32 s = sql.SQL("select {} from {}").format(33 sql.Identifier('field'), sql.Identifier('table'))34 s1 = s.as_string(self.conn)35 self.assertTrue(isinstance(s1, str))36 self.assertEqual(s1, 'select "field" from "table"')37 def test_pos_spec(self):38 s = sql.SQL("select {0} from {1}").format(39 sql.Identifier('field'), sql.Identifier('table'))40 s1 = s.as_string(self.conn)41 self.assertTrue(isinstance(s1, str))42 self.assertEqual(s1, 'select "field" from "table"')43 s = sql.SQL("select {1} from {0}").format(44 sql.Identifier('table'), sql.Identifier('field'))45 s1 = s.as_string(self.conn)46 self.assertTrue(isinstance(s1, str))47 self.assertEqual(s1, 'select "field" from "table"')48 def test_dict(self):49 s = sql.SQL("select {f} from {t}").format(50 f=sql.Identifier('field'), t=sql.Identifier('table'))51 s1 = s.as_string(self.conn)52 self.assertTrue(isinstance(s1, str))53 self.assertEqual(s1, 'select "field" from "table"')54 def test_unicode(self):55 s = sql.SQL("select {0} from {1}").format(56 sql.Identifier('field'), sql.Identifier('table'))57 s1 = s.as_string(self.conn)58 self.assertTrue(isinstance(s1, str))59 self.assertEqual(s1, 'select "field" from "table"')60 def test_compose_literal(self):61 s = sql.SQL("select {0};").format(sql.Literal(dt.date(2016, 12, 31)))62 s1 = s.as_string(self.conn)63 self.assertEqual(s1, "select '2016-12-31'::date;")64 def test_compose_empty(self):65 s = sql.SQL("select foo;").format()66 s1 = s.as_string(self.conn)67 self.assertEqual(s1, "select foo;")68 def test_percent_escape(self):69 s = sql.SQL("42 % {0}").format(sql.Literal(7))70 s1 = s.as_string(self.conn)71 self.assertEqual(s1, "42 % 7")72 def test_braces_escape(self):73 s = sql.SQL("{{{0}}}").format(sql.Literal(7))74 self.assertEqual(s.as_string(self.conn), "{7}")75 s = sql.SQL("{{1,{0}}}").format(sql.Literal(7))76 self.assertEqual(s.as_string(self.conn), "{1,7}")77 def test_compose_badnargs(self):78 self.assertRaises(IndexError, sql.SQL("select {0};").format)79 @skip_before_python(2, 7)80 def test_compose_badnargs_auto(self):81 self.assertRaises(IndexError, sql.SQL("select {};").format)82 self.assertRaises(ValueError, sql.SQL("select {} {1};").format, 10, 20)83 self.assertRaises(ValueError, sql.SQL("select {0} {};").format, 10, 20)84 def test_compose_bad_args_type(self):85 self.assertRaises(IndexError, sql.SQL("select {0};").format, a=10)86 self.assertRaises(KeyError, sql.SQL("select {x};").format, 10)87 def test_must_be_composable(self):88 self.assertRaises(TypeError, sql.SQL("select {0};").format, 'foo')89 self.assertRaises(TypeError, sql.SQL("select {0};").format, 10)90 def test_no_modifiers(self):91 self.assertRaises(ValueError, sql.SQL("select {a!r};").format, a=10)92 self.assertRaises(ValueError, sql.SQL("select {a:<};").format, a=10)93 def test_must_be_adaptable(self):94 class Foo(object):95 pass96 self.assertRaises(psycopg2.ProgrammingError,97 sql.SQL("select {0};").format(sql.Literal(Foo())).as_string, self.conn)98 def test_execute(self):99 cur = self.conn.cursor()100 cur.execute("""101 create table test_compose (102 id serial primary key,103 foo text, bar text, "ba'z" text)104 """)105 cur.execute(106 sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(107 sql.Identifier('test_compose'),108 sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),109 (sql.Placeholder() * 3).join(', ')),110 (10, 'a', 'b', 'c'))111 cur.execute("select * from test_compose")112 self.assertEqual(cur.fetchall(), [(10, 'a', 'b', 'c')])113 def test_executemany(self):114 cur = self.conn.cursor()115 cur.execute("""116 create table test_compose (117 id serial primary key,118 foo text, bar text, "ba'z" text)119 """)120 cur.executemany(121 sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(122 sql.Identifier('test_compose'),123 sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),124 (sql.Placeholder() * 3).join(', ')),125 [(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])126 cur.execute("select * from test_compose")127 self.assertEqual(cur.fetchall(),128 [(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])129 @skip_copy_if_green130 @skip_before_postgres(8, 2)131 def test_copy(self):132 cur = self.conn.cursor()133 cur.execute("""134 create table test_compose (135 id serial primary key,136 foo text, bar text, "ba'z" text)137 """)138 s = StringIO("10\ta\tb\tc\n20\td\te\tf\n")139 cur.copy_expert(140 sql.SQL("copy {t} (id, foo, bar, {f}) from stdin").format(141 t=sql.Identifier("test_compose"), f=sql.Identifier("ba'z")), s)142 s1 = StringIO()143 cur.copy_expert(144 sql.SQL("copy (select {f} from {t} order by id) to stdout").format(145 t=sql.Identifier("test_compose"), f=sql.Identifier("ba'z")), s1)146 s1.seek(0)147 self.assertEqual(s1.read(), 'c\nf\n')148class IdentifierTests(ConnectingTestCase):149 def test_class(self):150 self.assertTrue(issubclass(sql.Identifier, sql.Composable))151 def test_init(self):152 self.assertTrue(isinstance(sql.Identifier('foo'), sql.Identifier))153 self.assertTrue(isinstance(sql.Identifier('foo'), sql.Identifier))154 self.assertRaises(TypeError, sql.Identifier, 10)155 self.assertRaises(TypeError, sql.Identifier, dt.date(2016, 12, 31))156 def test_string(self):157 self.assertEqual(sql.Identifier('foo').string, 'foo')158 def test_repr(self):159 obj = sql.Identifier("fo'o")160 self.assertEqual(repr(obj), 'Identifier("fo\'o")')161 self.assertEqual(repr(obj), str(obj))162 def test_eq(self):163 self.assertTrue(sql.Identifier('foo') == sql.Identifier('foo'))164 self.assertTrue(sql.Identifier('foo') != sql.Identifier('bar'))165 self.assertTrue(sql.Identifier('foo') != 'foo')166 self.assertTrue(sql.Identifier('foo') != sql.SQL('foo'))167 def test_as_str(self):168 self.assertEqual(sql.Identifier('foo').as_string(self.conn), '"foo"')169 self.assertEqual(sql.Identifier("fo'o").as_string(self.conn), '"fo\'o"')170 def test_join(self):171 self.assertTrue(not hasattr(sql.Identifier('foo'), 'join'))172class LiteralTests(ConnectingTestCase):173 def test_class(self):174 self.assertTrue(issubclass(sql.Literal, sql.Composable))175 def test_init(self):176 self.assertTrue(isinstance(sql.Literal('foo'), sql.Literal))177 self.assertTrue(isinstance(sql.Literal('foo'), sql.Literal))178 self.assertTrue(isinstance(sql.Literal(b'foo'), sql.Literal))179 self.assertTrue(isinstance(sql.Literal(42), sql.Literal))180 self.assertTrue(isinstance(181 sql.Literal(dt.date(2016, 12, 31)), sql.Literal))182 def test_wrapped(self):183 self.assertEqual(sql.Literal('foo').wrapped, 'foo')184 def test_repr(self):185 self.assertEqual(repr(sql.Literal("foo")), "Literal('foo')")186 self.assertEqual(str(sql.Literal("foo")), "Literal('foo')")187 self.assertQuotedEqual(sql.Literal("foo").as_string(self.conn), "'foo'")188 self.assertEqual(sql.Literal(42).as_string(self.conn), "42")189 self.assertEqual(190 sql.Literal(dt.date(2017, 1, 1)).as_string(self.conn),191 "'2017-01-01'::date")192 def test_eq(self):193 self.assertTrue(sql.Literal('foo') == sql.Literal('foo'))194 self.assertTrue(sql.Literal('foo') != sql.Literal('bar'))195 self.assertTrue(sql.Literal('foo') != 'foo')196 self.assertTrue(sql.Literal('foo') != sql.SQL('foo'))197 def test_must_be_adaptable(self):198 class Foo(object):199 pass200 self.assertRaises(psycopg2.ProgrammingError,201 sql.Literal(Foo()).as_string, self.conn)202class SQLTests(ConnectingTestCase):203 def test_class(self):204 self.assertTrue(issubclass(sql.SQL, sql.Composable))205 def test_init(self):206 self.assertTrue(isinstance(sql.SQL('foo'), sql.SQL))207 self.assertTrue(isinstance(sql.SQL('foo'), sql.SQL))208 self.assertRaises(TypeError, sql.SQL, 10)209 self.assertRaises(TypeError, sql.SQL, dt.date(2016, 12, 31))210 def test_string(self):211 self.assertEqual(sql.SQL('foo').string, 'foo')212 def test_repr(self):213 self.assertEqual(repr(sql.SQL("foo")), "SQL('foo')")214 self.assertEqual(str(sql.SQL("foo")), "SQL('foo')")215 self.assertEqual(sql.SQL("foo").as_string(self.conn), "foo")216 def test_eq(self):217 self.assertTrue(sql.SQL('foo') == sql.SQL('foo'))218 self.assertTrue(sql.SQL('foo') != sql.SQL('bar'))219 self.assertTrue(sql.SQL('foo') != 'foo')220 self.assertTrue(sql.SQL('foo') != sql.Literal('foo'))221 def test_sum(self):222 obj = sql.SQL("foo") + sql.SQL("bar")223 self.assertTrue(isinstance(obj, sql.Composed))224 self.assertEqual(obj.as_string(self.conn), "foobar")225 def test_sum_inplace(self):226 obj = sql.SQL("foo")227 obj += sql.SQL("bar")228 self.assertTrue(isinstance(obj, sql.Composed))229 self.assertEqual(obj.as_string(self.conn), "foobar")230 def test_multiply(self):231 obj = sql.SQL("foo") * 3232 self.assertTrue(isinstance(obj, sql.Composed))233 self.assertEqual(obj.as_string(self.conn), "foofoofoo")234 def test_join(self):235 obj = sql.SQL(", ").join(236 [sql.Identifier('foo'), sql.SQL('bar'), sql.Literal(42)])237 self.assertTrue(isinstance(obj, sql.Composed))238 self.assertEqual(obj.as_string(self.conn), '"foo", bar, 42')239 obj = sql.SQL(", ").join(240 sql.Composed([sql.Identifier('foo'), sql.SQL('bar'), sql.Literal(42)]))241 self.assertTrue(isinstance(obj, sql.Composed))242 self.assertEqual(obj.as_string(self.conn), '"foo", bar, 42')243 obj = sql.SQL(", ").join([])244 self.assertEqual(obj, sql.Composed([]))245class ComposedTest(ConnectingTestCase):246 def test_class(self):247 self.assertTrue(issubclass(sql.Composed, sql.Composable))248 def test_repr(self):249 obj = sql.Composed([sql.Literal("foo"), sql.Identifier("b'ar")])250 self.assertEqual(repr(obj),251 """Composed([Literal('foo'), Identifier("b'ar")])""")252 self.assertEqual(str(obj), repr(obj))253 def test_seq(self):254 l = [sql.SQL('foo'), sql.Literal('bar'), sql.Identifier('baz')]255 self.assertEqual(sql.Composed(l).seq, l)256 def test_eq(self):257 l = [sql.Literal("foo"), sql.Identifier("b'ar")]258 l2 = [sql.Literal("foo"), sql.Literal("b'ar")]259 self.assertTrue(sql.Composed(l) == sql.Composed(list(l)))260 self.assertTrue(sql.Composed(l) != l)261 self.assertTrue(sql.Composed(l) != sql.Composed(l2))262 def test_join(self):263 obj = sql.Composed([sql.Literal("foo"), sql.Identifier("b'ar")])264 obj = obj.join(", ")265 self.assertTrue(isinstance(obj, sql.Composed))266 self.assertQuotedEqual(obj.as_string(self.conn), "'foo', \"b'ar\"")267 def test_sum(self):268 obj = sql.Composed([sql.SQL("foo ")])269 obj = obj + sql.Literal("bar")270 self.assertTrue(isinstance(obj, sql.Composed))271 self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")272 def test_sum_inplace(self):273 obj = sql.Composed([sql.SQL("foo ")])274 obj += sql.Literal("bar")275 self.assertTrue(isinstance(obj, sql.Composed))276 self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")277 obj = sql.Composed([sql.SQL("foo ")])278 obj += sql.Composed([sql.Literal("bar")])279 self.assertTrue(isinstance(obj, sql.Composed))280 self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")281 def test_iter(self):282 obj = sql.Composed([sql.SQL("foo"), sql.SQL('bar')])283 it = iter(obj)284 i = next(it)285 self.assertEqual(i, sql.SQL('foo'))286 i = next(it)287 self.assertEqual(i, sql.SQL('bar'))288 self.assertRaises(StopIteration, it.__next__)289class PlaceholderTest(ConnectingTestCase):290 def test_class(self):291 self.assertTrue(issubclass(sql.Placeholder, sql.Composable))292 def test_name(self):293 self.assertEqual(sql.Placeholder().name, None)294 self.assertEqual(sql.Placeholder('foo').name, 'foo')295 def test_repr(self):296 self.assertTrue(str(sql.Placeholder()), 'Placeholder()')297 self.assertTrue(repr(sql.Placeholder()), 'Placeholder()')298 self.assertTrue(sql.Placeholder().as_string(self.conn), '%s')299 def test_repr_name(self):300 self.assertTrue(str(sql.Placeholder('foo')), "Placeholder('foo')")301 self.assertTrue(repr(sql.Placeholder('foo')), "Placeholder('foo')")302 self.assertTrue(sql.Placeholder('foo').as_string(self.conn), '%(foo)s')303 def test_bad_name(self):304 self.assertRaises(ValueError, sql.Placeholder, ')')305 def test_eq(self):306 self.assertTrue(sql.Placeholder('foo') == sql.Placeholder('foo'))307 self.assertTrue(sql.Placeholder('foo') != sql.Placeholder('bar'))308 self.assertTrue(sql.Placeholder('foo') != 'foo')309 self.assertTrue(sql.Placeholder() == sql.Placeholder())310 self.assertTrue(sql.Placeholder('foo') != sql.Placeholder())311 self.assertTrue(sql.Placeholder('foo') != sql.Literal('foo'))312class ValuesTest(ConnectingTestCase):313 def test_null(self):314 self.assertTrue(isinstance(sql.NULL, sql.SQL))315 self.assertEqual(sql.NULL.as_string(self.conn), "NULL")316 def test_default(self):317 self.assertTrue(isinstance(sql.DEFAULT, sql.SQL))318 self.assertEqual(sql.DEFAULT.as_string(self.conn), "DEFAULT")319def test_suite():320 return unittest.TestLoader().loadTestsFromName(__name__)321if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run dbt-osmosis automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful