Best Python code snippet using localstack_python
api.py
Source:api.py
1from pyusermanager import *2import bottle3from http import HTTPStatus4from bottle import (5 route,6 run,7 post,8 get,9 static_file,10 request,11 redirect,12 HTTPResponse,13 response,14)15# from gevent import monkey16# monkey.patch_all()17from return_stuff import *18import filestuff19def get_default_response():20 default_response = HTTPResponse(21 status=HTTPStatus.IM_A_TEAPOT,22 body=json.dumps({"error": "this should never happen lol"}),23 )24 default_response.headers["Access-Control-Allow-Origin"] = "*"25 default_response.headers[26 "Access-Control-Allow-Methods"27 ] = "PUT, GET, POST, DELETE, OPTIONS"28 default_response.headers[29 "Access-Control-Allow-Headers"30 ] = "Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token"31 default_response.body = get_json_from_args(32 Alert("oh god something went terribly wrong", ALERT_TYPE.DANGER)33 )34 return default_response35###################################36#37# Fastapi stuff starts here!38#39###################################40app = bottle.app()41@app.get("/user/<username>")42def api_verify_token(username):43 ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(44 "REMOTE_ADDR"45 )46 return_response = get_default_response()47 return_response.status = HTTPStatus.UNAUTHORIZED48 request_params = dict(request.query.decode())49 print(request_params)50 try:51 token = request_params["token"]52 except Exception:53 token = None54 return_response.status = HTTPStatus.BAD_REQUEST55 return_response.body = get_json_from_args(56 Alert("No Token supplied!", ALERT_TYPE.DANGER), Redirect("/")57 )58 return return_response59 try:60 success, perms, username = verify_token(token, ip)61 except TypeError:62 return_response.status = HTTPStatus.BAD_REQUEST63 return_response.body = get_json_from_args(64 Alert("Token Type is not valid!", ALERT_TYPE.DANGER), Redirect("/")65 )66 return return_response67@app.post("/login")68def login():69 return_response = get_default_response()70 print("reeee")71 json_text = request.body.read().decode("utf-8")72 print(json_text)73 try:74 json_obj = json.load(request.body)75 except Exception as err:76 return_response.status = HTTPStatus.BAD_REQUEST77 print(err)78 print("could not read json input!")79 return return_response80 print("successfully converted to json")81 print(json_obj)82 # perform login!83 try:84 password = json_obj["password"]85 username = json_obj["username"]86 except Exception:87 return_response.status = HTTPStatus.BAD_REQUEST88 return_response.body = get_json_from_args(89 Alert("required vars missing", ALERT_TYPE.WARNING)90 )91 return return_response92 try:93 remember_me = json_obj["remember_me"]94 if remember_me:95 valid_days = 36596 else:97 valid_days = 198 except Exception:99 valid_days = 1100 try:101 success, user = login_user(username, password)102 except MissingUserException:103 return_response.status = HTTPStatus.BAD_REQUEST104 return_response.body = get_json_from_args(105 Alert("User does not exist!", ALERT_TYPE.WARNING)106 )107 return return_response108 if success:109 ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(110 "REMOTE_ADDR"111 )112 # create token113 try:114 token = create_token(user, ip, valid_days)115 return_response.status = HTTPStatus.OK116 return_response.body = get_json_from_args(117 {"Login": {"valid_day": valid_days, "token": token}},118 Alert("Successfull Login", ALERT_TYPE.INFO),119 Redirect(f"/user/{user}"),120 )121 except Exception:122 return_response.status = HTTPStatus.INTERNAL_SERVER_ERROR123 return_response.body = get_json_from_args(124 Alert("could not generate token!", ALERT_TYPE.DANGER)125 )126 else:127 return_response.status = HTTPStatus.UNAUTHORIZED128 return_response.body = get_json_from_args(129 Alert("password/User Combination is wrong!", ALERT_TYPE.WARNING)130 )131 return return_response132@app.get("/header")133def api_get_header():134 return_response = get_default_response()135 request_params = dict(request.query.decode())136 print(request_params)137 header = filestuff.get_template("header_logged_out.html")138 return_response.status = HTTPStatus.OK139 return_response.body = get_json_from_args({"pre_rendered": {"content": header}})140 try:141 token = request_params["token"]142 except Exception:143 token = None144 print(f"token: {token}")145 if token is not None:146 ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(147 "REMOTE_ADDR"148 )149 # verify token150 success, perms, username = verify_token(token, ip)151 print(verify_token(token, ip))152 if success:153 user_dict, token_dict, perm_dict = get_extended_info(username)154 template_vars = {155 "username": username,156 "api_url": "http://127.0.0.1:1337",157 "user_avatar": user_dict["avatar"],158 }159 if LoginConfig.admin_group_name in perms:160 header = filestuff.get_template(161 "header_logged_in_admin.html", **template_vars162 )163 else:164 header = filestuff.get_template(165 "header_logged_in.html", **template_vars166 )167 return_response.body = get_json_from_args(168 {"pre_rendered": {"content": header}}169 )170 return return_response171@app.get("/users")172def api_get_users():173 ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(174 "REMOTE_ADDR"175 )176 return_response = get_default_response()177 return_response.status = HTTPStatus.UNAUTHORIZED178 request_params = dict(request.query.decode())179 print(request_params)180 try:181 token = request_params["token"]182 except Exception:183 token = None184 print(token)185 if token is None or len(token) < 2:186 return_response.status = HTTPStatus.UNAUTHORIZED187 return_json = get_json_from_args(188 Alert("please log in", ALERT_TYPE.WARNING), Redirect("/login")189 )190 # return_json = get_json_from_args(Alert("Bitte einloggen!",ALERT_TYPE.WARNING),Modal("du bist nicht eingelogt",MODAL_TYPE.ERROR,headline="satanismus") ,Redirect("/"))191 return_response.body = return_json192 else:193 print(token)194 # verify token195 print(verify_token(str(token), ip))196 success, perms, username = verify_token(str(token), ip)197 if success:198 return_response.status = HTTPStatus.OK199 user_dict = get_users()200 return_response.body = get_json_from_args(user_dict)201 print(return_response.body)202 else:203 return_response.status = HTTPStatus.UNAUTHORIZED204 print(205 get_json_from_args(206 Alert("please log in", ALERT_TYPE.WARNING), Redirect("/login")207 )208 )209 return_response.body = get_json_from_args(210 Alert("Bitte einloggen!", ALERT_TYPE.WARNING), Redirect("/login")211 )212 print(return_response)213 return return_response214@app.get("/user/<username>")215def api_get_user(username):216 ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(217 "REMOTE_ADDR"218 )219 return_response = get_default_response()220 return_response.status = HTTPStatus.UNAUTHORIZED221 request_params = dict(request.query.decode())222 print(request_params)223 try:224 token = request_params["token"]225 except Exception:226 token = None227 print(token)228 if token is None or len(token) < 2:229 return_response.status = HTTPStatus.UNAUTHORIZED230 return_json = get_json_from_args(231 Alert("please log in", ALERT_TYPE.WARNING), Redirect("/login")232 )233 return_response.body = return_json234 else:235 print(token)236 # verify token237 print(verify_token(str(token), ip))238 success, perms, user = verify_token(str(token), ip)239 include_mail = None240 if user == username or LoginConfig.admin_group_name in perms:241 include_mail = True242 if success:243 return_response.status = HTTPStatus.OK244 try:245 user_dict, token_dict, perm_dict = get_extended_info(246 username, include_mail247 )248 except MissingUserException:249 return_response.status = HTTPStatus.BAD_REQUEST250 return_response.body = get_json_from_args(251 Alert("User does not Exist", ALERT_TYPE.DANGER), Redirect("/users")252 )253 return return_response254 if LoginConfig.admin_group_name in perms:255 return_response.body = get_json_from_args(256 {"user": user_dict},257 {"token": token_dict},258 {"groups": perm_dict},259 {"Admin": True},260 )261 elif username == user:262 return_response.body = get_json_from_args(263 {"user": user_dict}, {"token": token_dict}, {"groups": perm_dict}264 )265 else:266 return_response.body = get_json_from_args(267 {"user": user_dict}, {"groups": perm_dict}268 )269 print(return_response.body)270 else:271 return_response.status = HTTPStatus.UNAUTHORIZED272 print(273 get_json_from_args(274 Alert("please log in", ALERT_TYPE.WARNING), Redirect("/login")275 )276 )277 return_response.body = get_json_from_args(278 Alert("Bitte einloggen!", ALERT_TYPE.WARNING), Redirect("/login")279 )280 print(return_response)281 return return_response282@app.post("/user/delete/<username>")283def api_delete_user(username):284 return_response = get_default_response()285 return return_response286@app.get("/logout")287def api_logout_user():288 ip = request.environ.get("HTTP_X_FORWARDED_FOR") or request.environ.get(289 "REMOTE_ADDR"290 )291 return_response = get_default_response()292 return_response.status = HTTPStatus.UNAUTHORIZED293 request_params = dict(request.query.decode())294 print(request_params)295 try:296 token = request_params["token"]297 except Exception:298 token = None299 print(token)300 if token is not None:301 try:302 logout = {"Logout": logout_user(token, ip)}303 return_response.body = get_json_from_args(304 logout, Alert("successfull Logout", ALERT_TYPE.INFO), Redirect("/")305 )306 except TokenMissingException:307 return_response.status = HTTPStatus.BAD_REQUEST308 return_response.body = get_json_from_args(309 logout, Alert("Token does not exist!", ALERT_TYPE.DANGER)310 )311 except ValueError:312 return_response.status = HTTPStatus.BAD_REQUEST313 return_response.body = get_json_from_args(314 logout,315 Alert("cant Log Out User from Different IP!", ALERT_TYPE.WARNING),316 )317 else:318 return_response.status = HTTPStatus.BAD_REQUEST319 return_response.body = get_json_from_args(320 Alert("no Token transmitted", ALERT_TYPE.DANGER)321 )322 return return_response323@app.get("/avatars/<filename>")324def static_files(filename):325 return static_file(str(filename), root="./avatars/")326if __name__ == "__main__":327 # init base ad config328 ad_config = AD_Config()329 # init db config330 db_config = DB_Config(331 provider="mysql",332 host="127.0.0.1",333 port=3306,334 user="test",335 pw="test123",336 db_name="users",337 )338 # init db config with db config and other parameters339 config = LoginConfig(340 db_config=db_config,341 debug=True,342 auto_activate_accounts=False,343 admin_group_name="administrator",344 )345 # generate our admin user if it does not exist yet346 try:347 create_user(uname="admin", pw="12345", email="test@local", auth=AUTH_TYPE.LOCAL)348 # if user is new we need to verify the token if auto activate is disabled349 if not LoginConfig.auto_activate_accounts:350 print("since new admin user is not activated we try to activate him")351 auth_token = get_token("admin", ActivationCode)352 feedback = list(verify_token(token=auth_token, token_type=ActivationCode))353 print(f"success: {feedback[0]}")354 except AlreadyExistsException:355 print("user already exists")356 except Exception:357 print("somethign bad happened o-o")358 # add default admin group and add admin to it359 print(f"creating admin group: {LoginConfig.admin_group_name}")360 create_perm(LoginConfig.admin_group_name)361 print(f"adding admin user to {LoginConfig.admin_group_name} group")362 assign_perm_to_user("admin", LoginConfig.admin_group_name)...
lambda_authorizer_jwt.py
Source:lambda_authorizer_jwt.py
...21 jwks_client = PyJWKClient(url)22except Exception as e:23 logging.error(e)24 raise ("Unable to download JWKS")25def return_response(isAuthorized, other_params={}):26 return {"isAuthorized": isAuthorized, "context": other_params}27def lambda_handler(event, context):28 try:29 # fetching access token from event30 token = event["headers"]["authorization"]31 # check token structure32 if len(token.split(".")) != 3:33 return return_response(isAuthorized=False, other_params={})34 except Exception as e:35 logging.error(e)36 return return_response(isAuthorized=False, other_params={})37 try:38 # get unverified headers39 headers = jwt.get_unverified_header(token)40 # get signing key41 signing_key = jwks_client.get_signing_key_from_jwt(token)42 # validating exp, iat, signature, iss43 data = jwt.decode(44 token,45 signing_key.key,46 algorithms=[headers.get("alg")],47 options={48 "verify_signature": True,49 "verify_exp": True,50 "verify_iat": True,51 "verify_iss": True,52 "verify_aud": False,53 },54 )55 except jwt.InvalidTokenError as e:56 logging.error(e)57 return return_response(isAuthorized=False, other_params={})58 except jwt.DecodeError as e:59 logging.error(e)60 return return_response(isAuthorized=False, other_params={})61 except jwt.InvalidSignatureError as e:62 logging.error(e)63 return return_response(isAuthorized=False, other_params={})64 except jwt.ExpiredSignatureError as e:65 logging.error(e)66 return return_response(isAuthorized=False, other_params={})67 except jwt.InvalidIssuerError as e:68 logging.error(e)69 return return_response(isAuthorized=False, other_params={})70 except jwt.InvalidIssuedAtError as e:71 logging.error(e)72 return return_response(isAuthorized=False, other_params={})73 except Exception as e:74 logging.error(e)75 return return_response(isAuthorized=False, other_params={})76 try:77 # verifying audience...use data['client_id'] if verifying an access token else data['aud']78 if app_client != data.get("client_id"):79 return return_response(isAuthorized=False, other_params={})80 except Exception as e:81 logging.error(e)82 return return_response(isAuthorized=False, other_params={})83 try:84 # token_use check85 if data.get("token_use") != "access":86 return return_response(isAuthorized=False, other_params={})87 except Exception as e:88 logging.error(e)89 return return_response(isAuthorized=False, other_params={})90 try:91 # scope check92 if "openid" not in data.get("scope").split(" "):93 return return_response(isAuthorized=False, other_params={})94 except Exception as e:95 logging.error(e)96 return return_response(isAuthorized=False, other_params={})...
azure.py
Source:azure.py
1import requests2import os3import json4pic_directory = "DB/pictures/"5def faceRec(file_name):6 return_response = {'status': None}7 image_path = pic_directory + file_name8 print(image_path)9 image_data = open(image_path, 'rb')10 SUBSCRIPTION_KEY = '8b8cfcd8d7d742b3ba6542827a90fb03'11 ENDPOINT = 'https://testfacial.cognitiveservices.azure.com/face/v1.0/'12 tempfaceid = detectImage(image_data, SUBSCRIPTION_KEY, ENDPOINT)13 if not tempfaceid:14 return_response['status'] = 'Fail'15 else:16 face_id = identifyImage(tempfaceid, SUBSCRIPTION_KEY, ENDPOINT)17 if not face_id:18 return_response['status'] = 'Fail'19 else:20 return_response['status'] = 'Ok'21 return_response['faceId'] = face_id22 return_response = json.dumps(return_response, indent=3)23# print(return_response)24 return return_response25def detectImage(image_data, subscription_key, endpoint):26 headers = {'Content-Type': 'application/octet-stream',27 'Ocp-Apim-Subscription-Key': subscription_key}28 params = {29 'returnFaceId': 'true',30 }31 response = requests.post(endpoint + 'detect', params=params, headers=headers, data=image_data)32 response.raise_for_status()33 faces = response.json()34 if faces:35 return faces[0]['faceId']36 else:37 return None38def identifyImage(tempfaceid, subscription_key, endpoint):39 headers = {'Content-Type': 'application/json',40 'Ocp-Apim-Subscription-Key': subscription_key}41 params = {42 'personGroupId': 'godseye2020',43 'faceIds': [tempfaceid]44 }45 response = requests.post(endpoint + 'identify', json=params, headers=headers)46 face_id = response.json()47 if face_id:48 return face_id[0]['candidates'][0]['personId']49 else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!