Best Python code snippet using tempest_python
apis.py
Source:apis.py
1import json2import base643from flask import jsonify4import flask_login5from app.Lib import uuid_generator, send_request6import app.sql7from app.models import *8def login(request):9 get_username = request.json.get("username")10 get_password = request.json.get("password")11 if app.sql.login_as_user(get_username, get_password):12 return jsonify(13 {14 'status': 015 }16 )17 else:18 return jsonify(19 {20 'status': -121 }22 )23def check_login(request):24 if flask_login.current_user.is_authenticated:25 return jsonify(26 {27 "isLogin": True,28 "ifadmin": flask_login.current_user.root_number,29 "username": flask_login.current_user.username,30 }31 )32 else:33 return jsonify(34 {35 "isLogin": False36 }37 )38def logout(request):39 flask_login.logout_user()40 return jsonify({41 'status': 042 })43def ifUsed(request):44 if app.sql.ifCreated():45 if flask_login.current_user.is_authenticated:46 return jsonify({47 "status": 1,48 "login": True,49 })50 else:51 return jsonify({52 "status": 1,53 "login": False,54 })55 else:56 return jsonify({57 "status": 058 })59def get_server_info(request):60 data = app.sql.get_server_all()61 return jsonify(62 {63 "data": data,64 }65 )66def server_check(request):67 get_server_id = request.json.get('server_id')68 get_server = Server.query.filter_by(id=get_server_id).first()69 get_server_type = get_server.server_type70 get_server_ip = get_server.server_ip71 data = {72 'api': 'check_status',73 'psw': 'tttest',74 }75 return2json = send_request.send_request(76 get_server_type, get_server_ip, data)77 return jsonify(78 {79 'status': return2json.get("status")80 }81 )82def server_add(request):83 get_server_ip = request.json.get('server_ip')84 get_server_name = request.json.get('server_name')85 get_server_type = request.json.get('server_type')86 get_server_user = request.json.get('server_user')87 get_server_psw = request.json.get('server_psw')88 get_server_ssh = request.json.get('server_ssh_ip')89 return2json = send_request.send_request_server_bind(90 get_server_type, get_server_ip, {})91 if return2json.get("status") == 0:92 app.sql.create_server(get_server_ip, get_server_name,93 get_server_type, get_server_user, get_server_psw, get_server_ssh)94 return jsonify(95 {96 'status': 0,97 }98 )99 else:100 return jsonify(101 {102 'status': -1,103 'message': return2json.get("message")104 }105 )106def server_delete(request):107 get_server_id = request.json.get('server_id')108 get_s = app.sql.get_server_by_id(get_server_id)109 return2json = send_request.send_request_server_delete(110 get_s.server_type, get_s.server_ip, {})111 if return2json.get("status") == 0:112 app.sql.remove_server(get_server_id)113 return jsonify(114 {115 'status': 0,116 }117 )118 else:119 return jsonify(120 {121 'status': -1,122 'message': return2json.get("message")123 }124 )125def server_change_name(request):126 get_server_id = request.json.get('server_id')127 get_server_name = request.json.get('server_name')128 app.sql.change_server_name(get_server_id, get_server_name)129 return jsonify(130 {131 'status': 0,132 }133 )134def server_change_ssh(request):135 get_server_id = request.json.get('server_id')136 get_server_ssh = request.json.get('server_ssh')137 get_s = Server.query.filter_by(id=get_server_id).first()138 get_s.server_ssh = get_server_ssh139 db.session.commit()140 return jsonify(141 {142 'status': 0,143 }144 )145def get_containers_info(request):146 get_server_id = request.json.get('server_id')147 get_server = Server.query.filter_by(id=get_server_id).first()148 get_server_type = get_server.server_type149 get_server_ip = get_server.server_ip150 data = {151 'api': 'docker_socks',152 'url': '/containers/json?all=true',153 'method': 'GET',154 'psw': 'tttest',155 }156 return2json = send_request.send_request(157 get_server_type, get_server_ip, data)158 return jsonify(159 {160 "status": 0,161 'data': return2json,162 }163 )164def container_log(request):165 get_server_id = request.json.get('server_id')166 get_server = Server.query.filter_by(id=get_server_id).first()167 get_server_type = get_server.server_type168 get_server_ip = get_server.server_ip169 get_id = request.json.get("container_id")170 data = {171 'api': 'docker_socks',172 'url': '/containers/%s/logs?stdout=true×tamps=true' % (get_id),173 'method': 'GET',174 'psw': 'tttest',175 }176 # data = {177 # "api": "docker_logs",178 # "psw": "tttest",179 # "args": {180 # "container_id": get_id181 # }182 # }183 return2json = send_request.send_request(184 get_server_type, get_server_ip, data)185 result = list()186 if return2json.get("status") == 0:187 temp = return2json.get("data")188 result = temp.split("\r\n")189 result.reverse()190 return jsonify(191 {192 "status": 0,193 'data': result,194 }195 )196def container_delete(request):197 get_id = request.json.get("container_id")198 get_server_id = request.json.get('server_id')199 get_server = Server.query.filter_by(id=get_server_id).first()200 get_server_type = get_server.server_type201 get_server_ip = get_server.server_ip202 data = {203 'api': 'docker_socks',204 'url': '/containers/%s?force=true' % (get_id),205 'method': "DELETE",206 'psw': 'tttest',207 }208 return2data = send_request.send_request(209 get_server_type, get_server_ip, data)210 return jsonify(211 {212 'status': 0,213 'id': get_id,214 'data': return2data,215 }216 )217def container_add(request):218 data = {219 'api': "docker_socks",220 'url': "/containers/create",221 "method": 'POST',222 'psw': "tttest",223 'data': dict(),224 }225 # ===========get args============226 # éæ©çæå¡å¨227 get_server_id = request.json.get('server_id')228 get_server = Server.query.filter_by(id=get_server_id).first()229 get_server_type = get_server.server_type230 get_server_ip = get_server.server_ip231 # éå232 get_image = request.json.get("image")233 if get_image:234 data["data"]["Image"] = get_image235 # åå236 get_name = request.json.get("name")237 if get_name:238 data['url'] = "%s?name=%s" % (data['url'], get_name)239 # 端å£æ å°240 get_connect_port = request.json.get("connect_port")241 get_export_port = request.json.get("export_port")242 if get_connect_port:243 data["data"]["HostConfig"] = {244 "PortBindings": get_connect_port245 }246 data["data"]["ExposedPorts"] = get_export_port247 # ========以ä¸ä¸ºé«çº§è®¾ç½®=======248 # å¯å¨å½ä»¤249 get_cmd = request.json.get("cmd")250 if get_cmd:251 data["data"]["Cmd"] = get_cmd252 # å
¥å£å½ä»¤(代æ¿Dockerfile)253 get_entrypoint = request.json.get("entrypoint")254 if get_entrypoint:255 data["data"]["Entrypoint"] = get_entrypoint256 # è¿è¡ä½¿ç¨çç¨æ·257 get_user = request.json.get("user")258 if get_user:259 data["data"]['User'] = get_user260 # 模æç»ç«¯261 get_tty = request.json.get("Tty")262 if get_tty:263 data["data"]["Tty"] = get_tty264 # 交äºæ¨¡å¼265 get_interactive = request.json.get("interactive")266 if get_interactive:267 data["data"]["OpenStdin"] = get_interactive268 # å·¥ä½ç®å½269 get_workdir = request.json.get("workdir")270 if get_workdir:271 data["data"]["WorkingDir"] = get_workdir272 # ç½ç»é
ç½®273 get_network_model = request.json.get("network_model")274 if get_network_model:275 data["data"]["NetworkingConfig"] = {276 "EndpointsConfig": get_network_model277 }278 # ===========================279 # ===============================280 return2json = send_request.send_request(281 get_server_type, get_server_ip, data)282 return jsonify(283 {284 'status': 0,285 'data': return2json,286 }287 )288def container_inpect(request):289 get_container_id = request.json.get("container_id")290 get_server_id = request.json.get('server_id')291 get_server = Server.query.filter_by(id=get_server_id).first()292 get_server_type = get_server.server_type293 get_server_ip = get_server.server_ip294 data = {295 'api': 'docker_socks',296 'psw': 'tttest',297 'method': "GET",298 'url': '/containers/%s/json' % (get_container_id)299 }300 return2json = send_request.send_request(301 get_server_type, get_server_ip, data)302 return jsonify(303 {304 'status': 0,305 'data': return2json306 }307 )308def server_network_info(request):309 get_server_id = request.json.get('server_id')310 get_server = Server.query.filter_by(id=get_server_id).first()311 get_server_type = get_server.server_type312 get_server_ip = get_server.server_ip313 data = {314 'api': 'docker_socks',315 'url': '/networks?dangling=true',316 'method': "GET",317 'psw': 'tttest',318 }319 return2json = send_request.send_request(320 get_server_type, get_server_ip, data)321 return jsonify(322 {323 'status': 0,324 "data": return2json325 }326 )327def container_process(request):328 get_container_id = request.json.get("container_id")329 get_server_id = request.json.get('server_id')330 get_server = Server.query.filter_by(id=get_server_id).first()331 get_server_type = get_server.server_type332 get_server_ip = get_server.server_ip333 data = {334 'api': 'docker_socks',335 'url': '/containers/%s/top' % (get_container_id),336 'method': "GET",337 'psw': 'tttest',338 }339 return2json = send_request.send_request(340 get_server_type, get_server_ip, data)341 return jsonify(342 {343 'status': 0,344 'data': return2json,345 }346 )347def container_start(request):348 get_container_id = request.json.get("container_id")349 get_server_id = request.json.get('server_id')350 get_server = Server.query.filter_by(id=get_server_id).first()351 get_server_type = get_server.server_type352 get_server_ip = get_server.server_ip353 data = {354 'api': 'docker_socks',355 'url': '/containers/%s/start' % (get_container_id),356 'method': "POST",357 'psw': 'tttest',358 }359 return2json = send_request.send_request(360 get_server_type, get_server_ip, data)361 return jsonify(362 {363 'status': 0,364 'data': return2json,365 }366 )367def container_restart(request):368 get_container_id = request.json.get("container_id")369 get_server_id = request.json.get('server_id')370 get_server = Server.query.filter_by(id=get_server_id).first()371 get_server_type = get_server.server_type372 get_server_ip = get_server.server_ip373 data = {374 'api': 'docker_socks',375 'url': '/containers/%s/restart' % (get_container_id),376 'method': "POST",377 'psw': 'tttest',378 }379 return2json = send_request.send_request(380 get_server_type, get_server_ip, data)381 return jsonify(382 {383 'status': 0,384 'data': return2json,385 }386 )387def container_stop(request):388 get_container_id = request.json.get("container_id")389 get_server_id = request.json.get('server_id')390 get_server = Server.query.filter_by(id=get_server_id).first()391 get_server_type = get_server.server_type392 get_server_ip = get_server.server_ip393 data = {394 'api': 'docker_socks',395 'url': '/containers/%s/stop' % (get_container_id),396 'method': "POST",397 'psw': 'tttest',398 }399 return2json = send_request.send_request(400 get_server_type, get_server_ip, data)401 return jsonify(402 {403 'status': 0,404 'data': return2json,405 }406 )407def container_rename(request):408 get_server_id = request.json.get('server_id')409 get_server = Server.query.filter_by(id=get_server_id).first()410 get_server_type = get_server.server_type411 get_server_ip = get_server.server_ip412 get_container_id = request.json.get("container_id")413 get_container_name = request.json.get("container_name")414 data = {415 'api': 'docker_socks',416 'url': '/containers/%s/rename?name=%s' % (get_container_id, get_container_name),417 'method': "POST",418 'psw': 'tttest',419 }420 return2json = send_request.send_request(421 get_server_type, get_server_ip, data)422 return jsonify(423 {424 'status': 0,425 'data': return2json,426 }427 )428def container_delete_stoped(request):429 get_server_id = request.json.get('server_id')430 get_server = Server.query.filter_by(id=get_server_id).first()431 get_server_type = get_server.server_type432 get_server_ip = get_server.server_ip433 data = {434 'api': 'docker_socks',435 'url': '/containers/prune',436 'method': "POST",437 'psw': 'tttest',438 }439 return2json = send_request.send_request(440 get_server_type, get_server_ip, data)441 return jsonify(442 {443 'status': 0,444 'data': return2json,445 }446 )447def image_info(request):448 get_server_id = request.json.get('server_id')449 get_server = Server.query.filter_by(id=get_server_id).first()450 get_server_type = get_server.server_type451 get_server_ip = get_server.server_ip452 data = {453 'api': 'docker_socks',454 'url': '/images/json',455 'method': "GET",456 'psw': 'tttest',457 }458 return2json = send_request.send_request(459 get_server_type, get_server_ip, data)460 return jsonify(461 {462 'status': 0,463 'data': return2json,464 }465 )466def image_delete_cache(request):467 get_server_id = request.json.get('server_id')468 get_server = Server.query.filter_by(id=get_server_id).first()469 get_server_type = get_server.server_type470 get_server_ip = get_server.server_ip471 data = {472 'api': 'docker_socks',473 'url': "/build/prune",474 'method': "POST",475 'psw': 'tttest',476 }477 return2json = send_request.send_request(478 get_server_type, get_server_ip, data)479 return jsonify(480 {481 'status': 0,482 'data': return2json,483 }484 )485def image_pull(request):486 get_server_id = request.json.get('server_id')487 get_server = Server.query.filter_by(id=get_server_id).first()488 get_server_type = get_server.server_type489 get_server_ip = get_server.server_ip490 get_image_name = request.json.get("image_name")491 if ":" not in get_image_name:492 get_image_name = "%s:latest" % (get_image_name)493 data = {494 'api': 'docker_socks',495 'url': "/images/create?fromImage=%s" % (get_image_name),496 'method': "POST",497 'psw': 'tttest',498 }499 return2json = send_request.send_request(500 get_server_type, get_server_ip, data)501 return jsonify(502 {503 'status': 0,504 'data': return2json,505 }506 )507def image_inspect(request):508 get_server_id = request.json.get('server_id')509 get_server = Server.query.filter_by(id=get_server_id).first()510 get_server_type = get_server.server_type511 get_server_ip = get_server.server_ip512 get_image_id = request.json.get("image_id")513 data = {514 'api': 'docker_socks',515 'url': "/images/%s/json" % (get_image_id),516 'method': "GET",517 'psw': 'tttest',518 }519 return2json = send_request.send_request(520 get_server_type, get_server_ip, data)521 return jsonify(522 {523 'status': 0,524 'data': return2json,525 }526 )527def image_delele(request):528 get_server_id = request.json.get('server_id')529 get_server = Server.query.filter_by(id=get_server_id).first()530 get_server_type = get_server.server_type531 get_server_ip = get_server.server_ip532 get_image_id = request.json.get("image_id")533 data = {534 'api': 'docker_socks',535 'url': "/images/%s?force=true" % (get_image_id),536 'method': "DELETE",537 'psw': 'tttest',538 }539 return2json = send_request.send_request(540 get_server_type, get_server_ip, data)541 return jsonify(542 {543 'status': 0,544 'data': return2json,545 }546 )547def image_create_from_container(request):548 send_url = "/commit"549 # ======æ¥æ¶åæ°åæ°========550 get_server_id = request.json.get('server_id')551 get_server = Server.query.filter_by(id=get_server_id).first()552 get_server_type = get_server.server_type553 get_server_ip = get_server.server_ip554 get_container_id = request.json.get("container_id")555 send_url = "%s?container=%s" % (send_url, get_container_id)556 get_image_name = request.json.get("image_name")557 send_url = "%s\\&repo=%s" % (send_url, get_image_name)558 get_version = request.json.get("version")559 send_url = "%s\\&tag=%s" % (send_url, get_version)560 print(send_url)561 # ========================562 data = {563 'api': 'docker_socks',564 'url': send_url,565 'method': "POST",566 'psw': 'tttest',567 }568 return2json = send_request.send_request(569 get_server_type, get_server_ip, data)570 return jsonify(571 {572 'status': 0,573 'data': return2json,574 }575 )576def network_inspect(request):577 get_server_id = request.json.get('server_id')578 get_server = Server.query.filter_by(id=get_server_id).first()579 get_server_type = get_server.server_type580 get_server_ip = get_server.server_ip581 get_network_id = request.json.get("network_id")582 data = {583 'api': 'docker_socks',584 'url': "/networks/%s" % (get_network_id),585 'method': "GET",586 'psw': 'tttest',587 }588 return2json = send_request.send_request(589 get_server_type, get_server_ip, data)590 return jsonify(591 {592 'status': 0,593 'data': return2json,594 }595 )596def network_delete(request):597 get_server_id = request.json.get('server_id')598 get_server = Server.query.filter_by(id=get_server_id).first()599 get_server_type = get_server.server_type600 get_server_ip = get_server.server_ip601 get_network_id = request.json.get("network_id")602 data = {603 'api': 'docker_socks',604 'url': "/networks/%s" % (get_network_id),605 'method': "DELETE",606 'psw': 'tttest',607 }608 return2json = send_request.send_request(609 get_server_type, get_server_ip, data)610 return jsonify(611 {612 'status': 0,613 'data': return2json,614 }615 )616def network_create(request):617 data = {618 'api': 'docker_socks',619 'url': "/networks/create",620 'method': "POST",621 'psw': 'tttest',622 'data': dict()623 }624 # ========è·ååæ°=============625 get_server_id = request.json.get('server_id')626 get_server = Server.query.filter_by(id=get_server_id).first()627 get_server_type = get_server.server_type628 get_server_ip = get_server.server_ip629 data["data"]["EnableIPv6"] = False630 # åå631 get_network_name = request.json.get("network_name")632 data["data"]["Name"] = get_network_name633 # ç½å¡ç±»å634 get_network_drive = request.json.get("network_drive")635 data["data"]["Driver"] = get_network_drive636 # ç½ç»é
ç½®637 if get_network_drive != "none":638 data["data"]["IPAM"] = {"Config": list()}639 ipv4_config = dict()640 if request.json.get("subnet"):641 ipv4_config["Subnet"] = request.json.get("subnet")642 if request.json.get("ip_range"):643 ipv4_config["IPRange"] = request.json.get("ip_range")644 if request.json.get("gateway"):645 ipv4_config["Gateway"] = request.json.get("gateway")646 data["data"]["IPAM"]["Config"].append(ipv4_config)647 # é项648 if request.json.get("option"):649 data["data"]["Options"] = request.json.get("option")650 # ===========================651 return2json = send_request.send_request(652 get_server_type, get_server_ip, data)653 return jsonify(654 {655 'status': 0,656 'data': return2json,657 }658 )659def network_connect_container(request):660 get_server_id = request.json.get('server_id')661 get_server = Server.query.filter_by(id=get_server_id).first()662 get_server_type = get_server.server_type663 get_server_ip = get_server.server_ip664 get_network_id = request.json.get("network_id")665 data = {666 'api': 'docker_socks',667 'url': "/networks/%s/connect" % (get_network_id),668 'method': "POST",669 'psw': 'tttest',670 'data': dict()671 }672 # ========è·ååæ°=============673 # 容å¨id674 get_container_id = request.json.get("container_id")675 data["data"]["Container"] = get_container_id676 data["data"]["EndpointConfig"] = {677 "IPAMConfig": {678 "IPv4Address": request.json.get("network_ip")679 }680 }681 # ============================682 return2json = send_request.send_request(683 get_server_type, get_server_ip, data)684 return jsonify(685 {686 'status': 0,687 'data': return2json,688 }689 )690def network_disconnect_container(request):691 get_server_id = request.json.get('server_id')692 get_server = Server.query.filter_by(id=get_server_id).first()693 get_server_type = get_server.server_type694 get_server_ip = get_server.server_ip695 get_network_id = request.json.get("network_id")696 data = {697 'api': 'docker_socks',698 'url': "/networks/%s/disconnect" % (get_network_id),699 'method': "POST",700 'psw': 'tttest',701 'data': dict()702 }703 # ========è·ååæ°=============704 # 容å¨id705 get_container_id = request.json.get("container_id")706 data["data"]["Container"] = get_container_id707 data["data"]["Force"] = True708 # ============================709 return2json = send_request.send_request(710 get_server_type, get_server_ip, data)711 return jsonify(712 {713 'status': 0,714 'data': return2json,715 }716 )717def volume_info(request):718 get_server_id = request.json.get('server_id')719 get_server = Server.query.filter_by(id=get_server_id).first()720 get_server_type = get_server.server_type721 get_server_ip = get_server.server_ip722 data = {723 'api': 'docker_socks',724 'url': '/volumes',725 'method': 'GET',726 'psw': 'tttest',727 }728 return2json = send_request.send_request(729 get_server_type, get_server_ip, data)730 return jsonify(731 {732 "status": 0,733 'data': return2json,734 }735 )736def volume_inspcet(request):737 get_server_id = request.json.get('server_id')738 get_server = Server.query.filter_by(id=get_server_id).first()739 get_server_type = get_server.server_type740 get_server_ip = get_server.server_ip741 get_volume_id = request.json.get('volume_id')742 data = {743 'api': 'docker_socks',744 'url': '/volumes/%s' % (get_volume_id),745 'method': 'GET',746 'psw': 'tttest',747 }748 return2json = send_request.send_request(749 get_server_type, get_server_ip, data)750 return jsonify(751 {752 "status": 0,753 'data': return2json,754 }755 )756def volume_delete(request):757 get_server_id = request.json.get('server_id')758 get_server = Server.query.filter_by(id=get_server_id).first()759 get_server_type = get_server.server_type760 get_server_ip = get_server.server_ip761 get_volume_id = request.json.get('volume_id')762 data = {763 'api': 'docker_socks',764 'url': '/volumes/%s' % (get_volume_id),765 'method': 'DELETE',766 'psw': 'tttest',767 }768 return2json = send_request.send_request(769 get_server_type, get_server_ip, data)770 return jsonify(771 {772 "status": 0,773 'data': return2json,774 }775 )776def volume_delete_unused(request):777 get_server_id = request.json.get('server_id')778 get_server = Server.query.filter_by(id=get_server_id).first()779 get_server_type = get_server.server_type780 get_server_ip = get_server.server_ip781 data = {782 'api': 'docker_socks',783 'url': '/volumes/prune',784 'method': 'POST',785 'psw': 'tttest',786 }787 return2json = send_request.send_request(788 get_server_type, get_server_ip, data)789 return jsonify(790 {791 "status": 0,792 'data': return2json,793 }794 )795def system_infomation(request):796 get_server_id = request.json.get('server_id')797 get_server = Server.query.filter_by(id=get_server_id).first()798 get_server_type = get_server.server_type799 get_server_ip = get_server.server_ip800 data = {801 'api': 'docker_socks',802 'url': '/info',803 'method': 'GET',804 'psw': 'tttest',805 }806 return2json = send_request.send_request(807 get_server_type, get_server_ip, data)808 return jsonify(809 {810 "status": 0,811 'data': return2json,812 }813 )814def system_version(request):815 get_server_id = request.json.get('server_id')816 get_server = Server.query.filter_by(id=get_server_id).first()817 get_server_type = get_server.server_type818 get_server_ip = get_server.server_ip819 data = {820 'api': 'docker_socks',821 'url': '/version',822 'method': 'GET',823 'psw': 'tttest',824 }825 return2json = send_request.send_request(826 get_server_type, get_server_ip, data)827 return jsonify(828 {829 "status": 0,830 'data': return2json,831 }832 )833def welcome_create_user(request):834 get_username = request.json.get("username")835 get_password = request.json.get("password")836 app.sql.create_certification()837 if app.sql.create_user(get_username, get_password, True):838 return jsonify({839 'status': 0840 })841 else:842 return jsonify({843 'status': -1844 })845def check_server_status(request):846 get_server_id = request.json.get('server_id')847 get_server = Server.query.filter_by(id=get_server_id).first()848 get_server_type = get_server.server_type849 get_server_ip = get_server.server_ip850 data = {851 'api': 'check_server_status',852 'psw': 'tttest',853 }854 return2json = send_request.send_request(855 get_server_type, get_server_ip, data)856 return jsonify(857 {858 "status": 0,859 'data': return2json,860 }861 )862def system_use(request):863 get_server_id = request.json.get('server_id')864 get_server = Server.query.filter_by(id=get_server_id).first()865 get_server_type = get_server.server_type866 get_server_ip = get_server.server_ip867 data = {868 'api': 'docker_socks',869 'url': "/system/df",870 'method': "GET",871 'psw': 'tttest',872 }873 return2json = send_request.send_request(874 get_server_type, get_server_ip, data)875 return jsonify(876 {877 'status': 0,878 'data': return2json,879 }880 )881def psw_check(request):882 if flask_login.current_user.root_number == "-1":883 return jsonify(884 {885 'id': "ææ æéæ¥ç!",886 'status': 0887 }888 )889 else:890 return jsonify(891 {892 'id': app.sql.get_certification(),893 'status': 0894 }895 )896def user_info(request):897 return jsonify({898 'status': 0,899 'data': app.sql.get_all_user()900 })901def user_delete(request):902 c_u = flask_login.current_user.id903 get_uid = request.json.get("user_id")904 if flask_login.current_user.root_number != "100":905 return jsonify({906 'status': -2907 })908 if c_u == get_uid:909 return jsonify({910 'status': 1911 })912 return jsonify({913 'status': app.sql.remove_user(get_uid)914 })915def user_create(request):916 get_login_user = flask_login.current_user917 if get_login_user.root_number != "100":918 return jsonify({919 'status': -2920 })921 get_username = request.json.get("username")922 get_password = request.json.get("password")923 get_admin = request.json.get("ifadmin")924 if get_admin:925 if app.sql.create_user_nologin(get_username, get_password, True):926 return jsonify({927 'status': 0928 })929 else:930 return jsonify({931 'status': -1932 })933 else:934 if app.sql.create_user_nologin(get_username, get_password, False):935 return jsonify({936 'status': 0937 })938 else:939 return jsonify({940 'status': -1941 })942def server_ssh_info(request):943 get_server_id = request.json.get('server_id')944 get_server = Server.query.filter_by(id=get_server_id).first()945 get_server_ip = get_server.server_ssh946 get_server_user = get_server.server_user947 get_server_psw = get_server.server_psw948 if request.json.get('base64'):949 get_server_psw = get_server_psw.encode()950 get_server_psw = base64.b64encode(get_server_psw).decode()951 if flask_login.current_user.root_number == "-1":952 return jsonify(953 {954 'status': -999,955 'message': "æ æéæ§è¡è¯¥æä½ï¼"956 }957 )958 else:959 return jsonify(960 {961 'status': 0,962 'data': {963 'ip': get_server_ip,964 'user': get_server_user,965 'psw': get_server_psw,966 }967 }968 )969 else:970 if flask_login.current_user.root_number == "100":971 return jsonify(972 {973 'status': 0,974 'data': {975 'ip': get_server_ip,976 'user': get_server_user,977 'psw': get_server_psw,978 }979 }980 )981 else:982 return jsonify(983 {984 'status': 0,985 'data': {986 'ip': get_server_ip,987 'user': "ææ æéæ¥ç",988 'psw': "ææ æéæ¥ç",989 }990 }991 )992def server_one_info(request):993 get_server_id = request.json.get('server_id')994 get_server = Server.query.filter_by(id=get_server_id).first()995 get_server_ip = get_server.server_ip996 get_server_name = get_server.server_name997 return jsonify(998 {999 'status': 0,1000 'name': get_server_name,1001 'ip': get_server_ip1002 }1003 )1004def server_change_user(request):1005 get_server_id = request.json.get('server_id')1006 get_server = Server.query.filter_by(id=get_server_id).first()1007 get_server_user = request.json.get("server_user")1008 get_server.server_user = get_server_user1009 if flask_login.current_user.root_number == "100":1010 db.session.commit()1011 return jsonify({1012 'status': 0,1013 })1014 else:1015 return jsonify({1016 'status': -1,1017 })1018def server_change_psw(request):1019 get_server_id = request.json.get('server_id')1020 get_server = Server.query.filter_by(id=get_server_id).first()1021 get_server_psw = request.json.get("server_psw")1022 get_server.server_psw = get_server_psw1023 if flask_login.current_user.root_number == "100":1024 db.session.commit()1025 return jsonify({1026 'status': 0,1027 })1028 else:1029 return jsonify({1030 'status': -1,1031 })1032def search_user_by_name(request):1033 get_input = request.json.get("input")1034 get_users = User.query.filter(1035 User.username.like("%" + get_input +1036 "%") if get_input is not None else ""1037 )1038 return_list = list()1039 for i in get_users:1040 return_list.append(1041 {1042 "id": i.id,1043 "name": i.username1044 }1045 )1046 return jsonify({1047 'status': 0,1048 'data': return_list,1049 })1050def change_user(request):1051 get_login_user = flask_login.current_user1052 if get_login_user.root_number != "100":1053 return jsonify({1054 'status': -21055 })1056 get_id = request.json.get("id")1057 get_u = User.query.filter_by(id=get_id).first()1058 get_username = request.json.get("username")1059 get_password = request.json.get("password")1060 get_admin = request.json.get("ifadmin")1061 if get_username:1062 if User.query.filter_by(username=get_username).first():1063 return jsonify({1064 'status': -11065 })1066 else:1067 get_u.username = get_username1068 if get_password:1069 get_u.password = get_password1070 if get_admin:1071 get_u.root_number = "100"1072 else:1073 if(get_login_user.id == get_u.id):1074 return jsonify({1075 'status': -3,1076 })1077 get_u.root_number = "0"1078 db.session.commit()1079 return jsonify({1080 'status': 0,1081 })1082def user_history_info(request):1083 get_all_history = LoginHistory.query.all()1084 result = list()1085 for i in get_all_history:1086 get_user = User.query.filter_by(id=i.u_id).first()1087 result.append({1088 'user': get_user.username,1089 "time": i.login_time,1090 'id': get_user.id,1091 "key": get_user.id1092 })1093 result.reverse()1094 return jsonify({1095 'status': 0,1096 "data": result...
lgCommunication.py
Source:lgCommunication.py
...16 return ip_galaxy17def write_kml(kmlFolder,folder):18 print(kmlFolder)19 print(BASE_DIR)20 ip_server = get_server_ip()21 os.system("touch kmls.txt")22 os.system("rm kmls.txt")23 os.system("touch kmls.txt")24 file = open("kmls.txt", 'w')25 onlyfiles = [f for f in os.listdir(kmlFolder) if isfile(join(kmlFolder, f))]26 for kmlFile in onlyfiles:27 file.write("http://" + str(ip_server)[0:(len(ip_server) - 1)] +":8000/static/ibri/"+ folder +"/"+ kmlFile + "\n")28 file.close()29 send_kml_to_galaxy()30def write_kml_airrace(race):31 print(BASE_DIR)32 kmlFolder = BASE_DIR + "/static/airraces/" + race33 print(kmlFolder)34 print"-----"35 ip_server = get_server_ip()36 os.system("touch kmls.txt")37 os.system("rm kmls.txt")38 os.system("touch kmls.txt")39 file = open("kmls.txt", 'w')40 onlyfiles = [f for f in os.listdir(kmlFolder) if isfile(join(kmlFolder, f))]41 for kmlFile in onlyfiles:42 print(kmlFile)43 file.write("http://" + str(ip_server)[0:(len(ip_server) - 1)] +":8000/static/airraces/"+ race +"/"+ kmlFile + "\n")44 file.close()45 send_kml_to_galaxy()46def write_kml_participant(race,participant):47 print participant48 print race49 kmlFolder="static/airraces/"+ race + "/" + participant.kmlpath50 print(kmlFolder)51 ip_server = get_server_ip()52 os.system("touch kmls.txt")53 os.system("rm kmls.txt")54 os.system("touch kmls.txt")55 file = open("kmls.txt", 'w')56 file.write("http://" + str(ip_server)[0:(len(ip_server) - 1)] +":8000/"+ kmlFolder +"\n")57 file.close()58 send_kml_to_galaxy()59def write_kml_race():60 kmlFolder=BASE_DIR+"/static/kml/"61 ip_server = get_server_ip()62 os.system("touch kmls.txt")63 os.system("rm kmls.txt")64 os.system("touch kmls.txt")65 file = open("kmls.txt", 'w')66 onlyfiles = [f for f in os.listdir(kmlFolder) if isfile(join(kmlFolder, f))]67 for kmlFile in onlyfiles:68 file.write("http://" + str(ip_server)[0:(len(ip_server) - 1)] +":8000/static/kml/"+ kmlFile + "\n")69 file.close()70 send_kml_to_galaxy()71def write_idivt_kml():72 ip_server = get_server_ip()73 os.system("touch kmls.txt")74 os.system("rm kmls.txt")75 os.system("touch kmls.txt")76 file = open("kmls.txt", 'w')77 file.write("http://" + str(ip_server)[0:(len(ip_server) - 1)] +":8000/static/idivt/SOLSONA-CONGOST.kml"+ "\n")78 file.close()79 send_kml_to_galaxy()80def send_single_kml(participant):81 ip_server = get_server_ip()82 os.system("touch kmls.txt")83 os.system("rm kmls.txt")84 os.system("touch kmls.txt")85 file = open("kmls.txt", 'w')86 file.write("http://" + str(ip_server)[0:(len(ip_server) - 1)] +":8000/"+ participant.kmlpath + "\n")87 file.close()88 send_kml_to_galaxy()89def send_kml_to_galaxy():90 file_path = "kmls.txt"91 server_path = "/var/www/html"92 print("sshpass -p 'lqgalaxy' scp " + file_path + " lg@" + get_ip() +":" + server_path)93 os.system("sshpass -p 'lqgalaxy' scp " + file_path + " lg@" + get_ip() +":" + server_path)94 #os.system("sshpass -p 'lqgalaxy' scp -vvv kmls.txt lg@10.160.101.85:/var/www/html")95def get_server_ip():96 p = subprocess.Popen(97 "ifconfig eth0 | grep 'inet addr:' | cut -d: -f2 | awk '{print $1}'",98 shell=True,99 stdout=subprocess.PIPE)100 ip_server = p.communicate()[0]...
API.py
Source:API.py
1import json2import time3import requests4def get_server_ip():5 return 'http://88.198.110.35:8008'6 # return 'http://192.168.100.25:8008'7def get_token_out_sqlite():8 from SQLite.SQLite_CRUD_Querry.SQLite_Read import sqlite_read_col_in_table9 result = sqlite_read_col_in_table('token', 'token').replace('"', '')10 return result11def get_token(address: str, login: str, password: str):12 # print(f"login: {login} password: {password}")13 session = requests.post(f'{get_server_ip()}{address}', params={'username': login, 'password': password})14 print(session.text)15 print(session)16 return session.text17def get_data(address: str):18 aut = ("Bearer " + get_token_out_sqlite())19 headers = {"accept": "*/*",20 "Authorization": aut,21 "Content-Type": "application/json"}22 response = requests.get(f"{get_server_ip()}{address}", headers=headers)23 print(response.status_code)24 print(response.text)25 return response.text26def get_headers():27 t = get_token_out_sqlite()28 headers = {29 "Authorization": f"Bearer {t}",30 "Accept": "*/*",31 "Accept-Encoding": "gzip",32 "Content-Type": "application/json"}33 return headers34def get_headers_4_token():35 t = get_token_out_sqlite()36 aut = ("Bearer " + t)37 headers = {"accept": "*/*",38 "Authorization": aut,39 "Content-Type": "application/json"}40 # print(headers)41 return headers42def send_gps_driver(address: str, driver: int, lat: float, lon: float):43 print(f"lat: {lat} lon: {lon}")44 data = {45 "driverGeolocationId": 0,46 "latitude": lat,47 "longitude": lon,48 "driverId": driver}49 print(f"response: {data}")50 return send_request(f"{address}", data)51def send_request(address: str, data: str):52 # print(address)53 print(f"request: {get_server_ip()}{address}{json.dumps(data)}{get_headers_4_token()}")54 response = requests.post(f'{get_server_ip()}{address}', data=json.dumps(data), headers=get_headers_4_token())55 # response = requests.put(f'{get_server_ip()}{address}', data=data, headers=get_headers_4_token())56 print(response.status_code)57 print(f"response: {response.text}")58 print(time.time())59 return response.text60def login_request(address: str, login: str, password: str):61 # print(address)62 response = requests.post(f'{get_server_ip()}{address}?staffId={login}&password={password}',63 headers=get_headers_4_token())64 # response = requests.put(f'{get_server_ip()}{address}', data=data, headers=get_headers_4_token())65 print(response.status_code)66 print(f"response: {response.text}")67 print(time.time())68 return response.text69def get_gps_driver(address: str, driver: str):70 session = requests.get(f'{get_server_ip()}{address}/{driver}', headers=get_headers())71 print(session)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!