Best Python code snippet using localstack_python
views.py
Source:views.py
...53 signals.user_confirmed_email.send(current_app._get_current_object(), user=user)54 # Prepare one-time system message55 flash(_('Your email has been confirmed.'), 'success')56 # Auto-login after confirm or redirect to login page57 next = request.args.get('next', _endpoint_url(user_manager.after_confirm_endpoint))58 if user_manager.auto_login_after_confirm:59 return _do_login_user(user, next) # auto-login60 else:61 return redirect(url_for('user.login')+'?next='+next) # redirect to login page62@login_required63def change_password():64 """ Prompt for old password and new password and change the user's password."""65 user_manager = current_app.user_manager66 db_adapter = user_manager.db_adapter67 # Initialize form68 form = user_manager.change_password_form(request.form)69 form.next.data = request.args.get('next', _endpoint_url(user_manager.after_change_password_endpoint)) # Place ?next query param in next form field70 # Process valid POST71 if request.method=='POST' and form.validate():72 # Hash password73 hashed_password = user_manager.hash_password(form.new_password.data)74 # Change password75 user_manager.update_password(current_user, hashed_password)76 # Send 'password_changed' email77 if user_manager.enable_email and user_manager.send_password_changed_email:78 emails.send_password_changed_email(current_user)79 # Send password_changed signal80 signals.user_changed_password.send(current_app._get_current_object(), user=current_user)81 # Prepare one-time system message82 flash(_('Your password has been changed successfully.'), 'success')83 # Redirect to 'next' URL84 return redirect(form.next.data)85 # Process GET or invalid POST86 return render_template(user_manager.change_password_template, form=form)87@login_required88def change_username():89 """ Prompt for new username and old password and change the user's username."""90 user_manager = current_app.user_manager91 db_adapter = user_manager.db_adapter92 # Initialize form93 form = user_manager.change_username_form(request.form)94 form.next.data = request.args.get('next', _endpoint_url(user_manager.after_change_username_endpoint)) # Place ?next query param in next form field95 # Process valid POST96 if request.method=='POST' and form.validate():97 new_username = form.new_username.data98 # Change username99 user_auth = current_user.user_auth if db_adapter.UserAuthClass and hasattr(current_user, 'user_auth') else current_user100 db_adapter.update_object(user_auth, username=new_username)101 db_adapter.commit()102 # Send 'username_changed' email103 if user_manager.enable_email and user_manager.send_username_changed_email:104 emails.send_username_changed_email(current_user)105 # Send username_changed signal106 signals.user_changed_username.send(current_app._get_current_object(), user=current_user)107 # Prepare one-time system message108 flash(_("Your username has been changed to '%(username)s'.", username=new_username), 'success')109 # Redirect to 'next' URL110 return redirect(form.next.data)111 # Process GET or invalid POST112 return render_template(user_manager.change_username_template, form=form)113@login_required114def email_action(id, action):115 """ Perform action 'action' on UserEmail object 'id'116 """117 user_manager = current_app.user_manager118 db_adapter = user_manager.db_adapter119 # Retrieve UserEmail by id120 user_email = db_adapter.find_first_object(db_adapter.UserEmailClass, id=id)121 # Users may only change their own UserEmails122 if not user_email or user_email.user_id != int(current_user.get_id()):123 return unauthorized()124 if action=='delete':125 # Primary UserEmail can not be deleted126 if user_email.is_primary:127 return unauthorized()128 # Delete UserEmail129 db_adapter.delete_object(user_email)130 db_adapter.commit()131 elif action=='make-primary':132 # Disable previously primary emails133 user_emails = db_adapter.find_all_objects(db_adapter.UserEmailClass, user_id=int(current_user.get_id()))134 for ue in user_emails:135 if ue.is_primary:136 ue.is_primary = False137 # Enable current primary email138 user_email.is_primary = True139 # Commit140 db_adapter.commit()141 elif action=='confirm':142 _send_confirm_email(user_email.user, user_email)143 else:144 return unauthorized()145 return redirect(url_for('user.manage_emails'))146def forgot_password():147 """Prompt for email and send reset password email."""148 user_manager = current_app.user_manager149 db_adapter = user_manager.db_adapter150 # Initialize form151 form = user_manager.forgot_password_form(request.form)152 # Process valid POST153 if request.method=='POST' and form.validate():154 email = form.email.data155 # Find user by email156 user, user_email = user_manager.find_user_by_email(email)157 if user:158 # Generate reset password link159 token = user_manager.generate_token(int(user.get_id()))160 reset_password_link = url_for('user.reset_password', token=token, _external=True)161 # Send forgot password email162 emails.send_forgot_password_email(user, user_email, reset_password_link)163 # Store token164 if hasattr(user, 'reset_password_token'):165 db_adapter.update_object(user, reset_password_token=token)166 db_adapter.commit()167 # Send forgot_password signal168 signals.user_forgot_password.send(current_app._get_current_object(), user=user)169 # Prepare one-time system message170 flash(_("A reset password email has been sent to '%(email)s'. Open that email and follow the instructions to reset your password.", email=email), 'success')171 # Redirect to the login page172 return redirect(_endpoint_url(user_manager.after_forgot_password_endpoint))173 # Process GET or invalid POST174 return render_template(user_manager.forgot_password_template, form=form)175@app.route('/user/profile', methods=['GET', 'POST'])176@login_required177def user_profile_page():178 # Initialize form179 form = UserProfileForm(request.form, current_user)180 # Process valid POST181 if request.method=='POST' and form.validate():182 # Copy form fields to user_profile fields183 form.populate_obj(current_user)184 # Save user_profile185 db.session.commit()186 # Redirect to home page187 return redirect(url_for('home_page'))188 # Process GET or invalid POST189 return render_template('users/user_profile_page.html',form=form)190@app.route('/user/sign-in', methods=['GET', 'POST'])191def login():192 """ Prompt for username/email and password and sign the user in."""193 user_manager = current_app.user_manager194 db_adapter = user_manager.db_adapter195 next = request.args.get('next', _endpoint_url(user_manager.after_login_endpoint))196 reg_next = request.args.get('reg_next', _endpoint_url(user_manager.after_register_endpoint))197 # Immediately redirect already logged in users198 if current_user.is_authenticated and user_manager.auto_login_at_login:199 return redirect(next)200 # Initialize form201 login_form = user_manager.login_form(request.form) # for login.html202 register_form = user_manager.register_form() # for login_or_register.html203 if request.method!='POST':204 login_form.next.data = register_form.next.data = next205 login_form.reg_next.data = register_form.reg_next.data = reg_next206 # Process valid POST207 if request.method=='POST' and login_form.validate():208 # Retrieve User209 user = None210 user_email = None211 if user_manager.enable_username:212 # Find user record by username213 user = user_manager.find_user_by_username(login_form.username.data)214 user_email = None215 # Find primary user_email record216 if user and db_adapter.UserEmailClass:217 user_email = db_adapter.find_first_object(db_adapter.UserEmailClass,218 user_id=int(user.get_id()),219 is_primary=True,220 )221 # Find user record by email (with form.username)222 if not user and user_manager.enable_email:223 user, user_email = user_manager.find_user_by_email(login_form.username.data)224 else:225 # Find user by email (with form.email)226 user, user_email = user_manager.find_user_by_email(login_form.email.data)227 if user:228 # Log user in229 return _do_login_user(user, login_form.next.data, login_form.remember_me.data)230 # Process GET or invalid POST231 return render_template(user_manager.login_template,232 form=login_form,233 login_form=login_form,234 register_form=register_form)235@app.route('/user/sign-out', methods=['GET', 'POST'])236def logout():237 """ Sign the user out."""238 user_manager = current_app.user_manager239 # Send user_logged_out signal240 signals.user_logged_out.send(current_app._get_current_object(), user=current_user)241 # Use Flask-Login to sign out user242 logout_user()243 # Prepare one-time system message244 flash(_('You have signed out successfully.'), 'success')245 # Redirect to logout_next endpoint or '/'246 next = request.args.get('next', _endpoint_url(user_manager.after_logout_endpoint)) # Get 'next' query param247 return redirect(next)248def _do_login_user(user, next, remember_me=False):249 # User must have been authenticated250 if not user: return unauthenticated()251 # Check if user account has been disabled252 if not user.is_active():253 flash(_('Your account has not been activated. Please check your email.'), 'error')254 return redirect(url_for('user.login'))255 # Check if user has a confirmed email address256 user_manager = current_app.user_manager257 if user_manager.enable_email and user_manager.enable_confirm_email \258 and not current_app.user_manager.enable_login_without_confirm_email \259 and not user.has_confirmed_email():260 url = url_for('user.resend_confirm_email')261 flash(_('Your email address has not yet been confirmed. <a href="%(url)s">Re-send confirmation email</a>.', url=url), 'error')262 return redirect(url_for('user.login'))263 # Use Flask-Login to sign in user264 #print('login_user: remember_me=', remember_me)265 login_user(user, remember=remember_me)266 # Send user_logged_in signal267 signals.user_logged_in.send(current_app._get_current_object(), user=user)268 # Prepare one-time system message269 #flash(_('You have signed in successfully.'), 'success')270 if current_user.user_auth.credentials == 1:271 stripe.api_key = decode(current_user.user_auth.api_key)272 session['api_key'] = stripe.api_key273 # Redirect to 'next' URL274 return redirect(next)275 else:276 flash(_('Please enter your Stripe API key'), 'error')277 return redirect(url_for('getstarted'))278def register():279 """ Display registration form and create new User."""280 user_manager = current_app.user_manager281 db_adapter = user_manager.db_adapter282 next = request.args.get('next', _endpoint_url(user_manager.after_login_endpoint))283 reg_next = request.args.get('reg_next', _endpoint_url(user_manager.after_register_endpoint))284 # Initialize form285 login_form = user_manager.login_form() # for login_or_register.html286 register_form = user_manager.register_form(request.form) # for register.html287 if request.method!='POST':288 login_form.next.data = register_form.next.data = next289 login_form.reg_next.data = register_form.reg_next.data = reg_next290 # Process valid POST291 if request.method=='POST' and register_form.validate():292 # Create a User object using Form fields that have a corresponding User field293 User = db_adapter.UserClass294 user_class_fields = User.__dict__295 user_fields = {}296 user_auth_fi297 # Create a UserEmail object using Form fields that have a corresponding UserEmail field298 if db_adapter.UserEmailClass:299 UserEmail = db_adapter.UserEmailClass300 user_email_class_fields = UserEmail.__dict__301 user_email_fields = {}302 # Create a UserAuth object using Form fields that have a corresponding UserAuth field303 if db_adapter.UserAuthClass:304 UserAuth = db_adapter.UserAuthClass305 user_auth_class_fields = UserAuth.__dict__306 user_auth_fields = {}307 # Enable user account308 if db_adapter.UserProfileClass:309 if hasattr(db_adapter.UserProfileClass, 'active'):310 user_auth_fields['active'] = True311 elif hasattr(db_adapter.UserProfileClass, 'is_enabled'):312 user_auth_fields['is_enabled'] = True313 else:314 user_auth_fields['is_active'] = True315 else:316 if hasattr(db_adapter.UserClass, 'active'):317 user_fields['active'] = True318 elif hasattr(db_adapter.UserClass, 'is_enabled'):319 user_fields['is_enabled'] = True320 else:321 user_fields['is_active'] = True322 # For all form fields323 for field_name, field_value in register_form.data.items():324 # Hash password field325 if field_name=='password':326 hashed_password = user_manager.hash_password(field_value)327 if db_adapter.UserAuthClass:328 user_auth_fields['password'] = hashed_password329 else:330 user_fields['password'] = hashed_password331 # Store corresponding Form fields into the User object and/or UserProfile object332 else:333 if field_name in user_class_fields:334 user_fields[field_name] = field_value335 if db_adapter.UserEmailClass:336 if field_name in user_email_class_fields:337 user_email_fields[field_name] = field_value338 if db_adapter.UserAuthClass:339 if field_name in user_auth_class_fields:340 user_auth_fields[field_name] = field_value341 # Add User record using named arguments 'user_fields'342 user = db_adapter.add_object(User, **user_fields)343 if db_adapter.UserProfileClass:344 user_profile = user345 # Add UserEmail record using named arguments 'user_email_fields'346 if db_adapter.UserEmailClass:347 user_email = db_adapter.add_object(UserEmail,348 user=user,349 is_primary=True,350 **user_email_fields)351 else:352 user_email = None353 # Add UserAuth record using named arguments 'user_auth_fields'354 if db_adapter.UserAuthClass:355 user_auth = db_adapter.add_object(UserAuth, **user_auth_fields)356 if db_adapter.UserProfileClass:357 user = user_auth358 else:359 user.user_auth = user_auth360 db_adapter.commit()361 # Send 'registered' email and delete new User object if send fails362 if user_manager.send_registered_email:363 try:364 # Send 'registered' email365 _send_registered_email(user, user_email)366 except Exception as e:367 # delete new User object if send fails368 db_adapter.delete_object(user)369 db_adapter.commit()370 raise e371 # Send user_registered signal372 signals.user_registered.send(current_app._get_current_object(), user=user)373 # Redirect if USER_ENABLE_CONFIRM_EMAIL is set374 if user_manager.enable_confirm_email:375 next = request.args.get('next', _endpoint_url(user_manager.after_register_endpoint))376 return redirect(next)377 # Auto-login after register or redirect to login page378 next = request.args.get('next', _endpoint_url(user_manager.after_confirm_endpoint))379 if user_manager.auto_login_after_register:380 return _do_login_user(user, reg_next) # auto-login381 else:382 return redirect(url_for('user.login')+'?next='+reg_next) # redirect to login page383 # Process GET or invalid POST384 return render_template(user_manager.register_template,385 form=register_form,386 login_form=login_form,387 register_form=register_form)388def resend_confirm_email():389 """Prompt for email and re-send email conformation email."""390 user_manager = current_app.user_manager391 db_adapter = user_manager.db_adapter392 # Initialize form393 form = user_manager.resend_confirm_email_form(request.form)394 # Process valid POST395 if request.method=='POST' and form.validate():396 email = form.email.data397 # Find user by email398 user, user_email = user_manager.find_user_by_email(email)399 if user:400 _send_confirm_email(user, user_email)401 # Redirect to the login page402 return redirect(_endpoint_url(user_manager.after_resend_confirm_email_endpoint))403 # Process GET or invalid POST404 return render_template(user_manager.resend_confirm_email_template, form=form)405def reset_password(token):406 """ Verify the password reset token, Prompt for new password, and set the user's password."""407 # Verify token408 user_manager = current_app.user_manager409 db_adapter = user_manager.db_adapter410 is_valid, has_expired, user_id = user_manager.verify_token(411 token,412 user_manager.reset_password_expiration)413 if has_expired:414 flash(_('Your reset password token has expired.'), 'error')415 return redirect(url_for('user.login'))416 if not is_valid:417 flash(_('Your reset password token is invalid.'), 'error')418 return redirect(url_for('user.login'))419 user = user_manager.get_user_by_id(user_id)420 if user:421 # Avoid re-using old tokens422 if hasattr(user, 'reset_password_token'):423 verified = user.reset_password_token == token424 else:425 verified = True426 if not user or not verified:427 flash(_('Your reset password token is invalid.'), 'error')428 return redirect(_endpoint_url(user_manager.login_endpoint))429 # Initialize form430 form = user_manager.reset_password_form(request.form)431 # Process valid POST432 if request.method=='POST' and form.validate():433 # Invalidate the token by clearing the stored token434 if hasattr(user, 'reset_password_token'):435 db_adapter.update_object(user, reset_password_token='')436 # Change password437 hashed_password = user_manager.hash_password(form.new_password.data)438 user_auth = user.user_auth if db_adapter.UserAuthClass and hasattr(user, 'user_auth') else user439 db_adapter.update_object(user_auth, password=hashed_password)440 db_adapter.commit()441 # Send 'password_changed' email442 if user_manager.enable_email and user_manager.send_password_changed_email:443 emails.send_password_changed_email(user)444 # Prepare one-time system message445 flash(_("Your password has been reset successfully. Please sign in with your new password"), 'success')446 # Auto-login after reset password or redirect to login page447 next = request.args.get('next', _endpoint_url(user_manager.after_reset_password_endpoint))448 if user_manager.auto_login_after_reset_password:449 return _do_login_user(user, next) # auto-login450 else:451 return redirect(url_for('user.login')+'?next='+next) # redirect to login page452 # Process GET or invalid POST453 return render_template(user_manager.reset_password_template, form=form)454def unconfirmed():455 """ Prepare a Flash message and redirect to USER_UNCONFIRMED_URL"""456 # Prepare Flash message457 url = request.script_root + request.path458 flash(_("You must confirm your email to access '%(url)s'.", url=url), 'error')459 # Redirect to USER_UNCONFIRMED_EMAIL_ENDPOINT460 user_manager = current_app.user_manager461 return redirect(_endpoint_url(user_manager.unconfirmed_email_endpoint))462def unauthenticated():463 """ Prepare a Flash message and redirect to USER_UNAUTHENTICATED_ENDPOINT"""464 # Prepare Flash message465 url = request.url466 flash(_("You must be signed in to access '%(url)s'.", url=url), 'error')467 # quote the fully qualified url468 quoted_url = quote(url)469 # Redirect to USER_UNAUTHENTICATED_ENDPOINT470 user_manager = current_app.user_manager471 return redirect(_endpoint_url(user_manager.unauthenticated_endpoint)+'?next='+ quoted_url)472def unauthorized():473 """ Prepare a Flash message and redirect to USER_UNAUTHORIZED_ENDPOINT"""474 # Prepare Flash message475 url = request.script_root + request.path476 flash(_("You do not have permission to access '%(url)s'.", url=url), 'error')477 # Redirect to USER_UNAUTHORIZED_ENDPOINT478 user_manager = current_app.user_manager479 return redirect(_endpoint_url(user_manager.unauthorized_endpoint))480def _endpoint_url(endpoint):481 url = '/'482 if endpoint:483 url = url_for(endpoint)...
bases.py
Source:bases.py
...18 self._response = requests.get(self._endpoint_url)19 return self._response20 else:21 self._response = requests.get(self._endpoint_url, headers=self._auth_header)22 self._refreshing_endpoint_url()23 return self._response24 def _send_post_requests(self, apiClient=False):25 if apiClient == False:26 self._response = requests.post(27 url=self._endpoint_url,28 json=self._request_body,29 headers=self._auth_header30 )31 try:32 self._orderId = self._response.json()["orderId"]33 open("endpoints/orderId.txt", "w").write(copy.deepcopy(self._orderId))34 except KeyError:35 self._error_msg = self._response.json()["error"]36 return self._response37 else:38 self._response = requests.post(39 url=self._endpoint_url,40 json=self._request_body41 ) 42 return self._response43 def _send_patch_requests(self):44 self._response = requests.patch(45 url=self._endpoint_url,46 json=self._customer_name,47 headers=self._auth_header48 )49 self._refreshing_endpoint_url()50 return self._response51 def _send_delete_requests(self):52 self._response = requests.delete(53 url=self._endpoint_url,54 headers=self._auth_header55 )56 self._refreshing_endpoint_url()57 return self._response58 def _refreshing_endpoint_url(self):59 self._endpoint_url = self._endpoint_url_copy60 61 def checking_status_code(self):...
flightweather.py
Source:flightweather.py
1import requests2import json3import logging4import pandas as pd5class RestAPI:6 def __init__():7 pass8 def call(self, *args):9 response = requests.get(url=self._endpoint_url, params=self.params)10 self.data = response.json()11class WeatherAPI(RestAPI):12 """13 Wrapper for OpenWeather API14 """15 def __init__(self, api_key, lat, lon):16 self._endpoint_url="https://api.openweathermap.org/data/2.5/weather?"17 self.params = {18 "appid": api_key,19 "lat": lat,20 "lon": lon,21 "units": "metric"22 }23 def get(self, metric="main"):24 self.call()25 data = self.data[metric]26 logging.info(data)27 df = pd.DataFrame([data])28 return df29class FlightAPI(RestAPI):30 """31 Wrapper for AirLabs API32 """33 def __init__(self, api_key, endpoint="flights"):34 self._endpoint_url=f"https://airlabs.co/api/v9/{endpoint}?"35 self.params = {36 "api_key": api_key37 }38 def get(self):39 self.call()40 logging.debug(self.data)41 data = self.data["response"]42 logging.debug(data)43 if type(data) != list:44 data = [data]45 df = pd.DataFrame(data)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!