How to use _services method in localstack

Best Python code snippet using localstack_python

users.py

Source: users.py Github

copy

Full Screen

1"""Receive params as schema, calls services fnc, then respond as schema."""2from typing import List3import datetime as _dt4import fastapi as _fastapi5import fastapi.security as _security6import sqlalchemy.orm as _orm7import backend.models.schemas as _schemas8import backend.models.model as _model9import backend.services.users as _services10from backend.config import settings11app = _fastapi.FastAPI()12@app.post("/​api/​v1/​users")13async def create_user(14 user: _schemas.UserCreate, db: _orm.Session = _fastapi.Depends(_services.get_db)15):16 db_user = await _services.get_user_by_email(user.email, db)17 if db_user:18 raise _fastapi.HTTPException(status_code=400, detail="Email already in user")19 await _services.create_user(user, db)20 return await _services.create_access_token(user)21@app.post("/​api/​v1/​token")22async def generate_token(23 form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends(),24 db: _orm.Session = _fastapi.Depends(_services.get_db)25):26 user = await _services.authenticate_user(form_data.username, form_data.password, db)27 if not user:28 raise _fastapi.HTTPException(status_code=401, detail="Invalid Credentials")29 return await _services.create_token(user)30@app.get("/​api/​v1/​users/​me", response_model=_schemas.User)31async def get_user(user: _schemas.User = _fastapi.Depends(_services.get_current_user)):32 return user33@app.post("/​api/​v1/​leads", response_model=_schemas.Lead)34async def create_lead(35 lead: _schemas.LeadCreate,36 user: _schemas.User = _fastapi.Depends(_services.get_current_user),37 db: _orm.Session = _fastapi.Depends(_services.get_db)38):39 return await _services.create_lead(user=user, db=db, lead=lead)40@app.get("/​api/​v1/​leads", response_model=List[_schemas.Lead])41async def get_leads(42 user: _schemas.User = _fastapi.Depends(_services.get_current_user),43 db: _orm.Session = _fastapi.Depends(_services.get_db)44):45 return await _services.get_leads(user=user, db=db)46@app.get("/​api/​v1/​leads/​{lead_id}", status_code=200)47async def get_lead(48 lead_id: int,49 user: _schemas.User = _fastapi.Depends(_services.get_current_user),50 db: _orm.Session = _fastapi.Depends(_services.get_db)51):52 return await _services.get_lead(lead_id=lead_id, user=user, db=db)53@app.delete("/​api/​v1/​leads/​{lead_id}", status_code=204)54async def delete_lead(55 lead_id: int,56 user: _schemas.User = _fastapi.Depends(_services.get_current_user),57 db: _orm.Session = _fastapi.Depends(_services.get_db)58):59 await _services.delete_lead(lead_id, user, db)60 return {"message", "Successfully Deleted"}61@app.put("/​api/​v1/​leads/​{lead_id}", status_code=200)62async def update_lead(63 lead_id: int,64 lead: _schemas.LeadCreate,65 user: _schemas.User = _fastapi.Depends(_services.get_current_user),66 db: _orm.Session = _fastapi.Depends(_services.get_db)67):68 await _services.update_lead(lead_id, lead, user, db)69 return {"message", "Successfully Updated"}70@app.get("/​api/​v1")71async def root():72 return {"message": "Awesome Leads Manager"}73@app.post("/​api/​users", response_model=_schemas.User)74def signup(user_data: _schemas.UserCreate, db: _orm.Session = _fastapi.Depends(_services.get_db)):75 """add new user"""76 user = _services.get_user_by_email(db, user_data.email)77 if user:78 raise _fastapi.HTTPException(status_code=409,79 detail="Email already registered.")80 signedup_user = _services.create_user(db, user_data)81 return signedup_user82@app.post("/​api/​v1/​login", response_model=_schemas.TokenData)83async def login_for_access_token(db: _orm.Session = _fastapi.Depends(_services.get_db),form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends()):84 user = _services.authenticate_user(form_data.username, form_data.password)85 if not user:86 raise _fastapi.HTTPException(87 status_code=_fastapi.status.HTTP_401_UNAUTHORIZED,88 detail="Incorrect username or password",89 headers={"WWW-Authenticate": "Bearer"},90 )91 access_token_expires = _dt.timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)92 access_token = _services.create_access_token(93 data={"sub": user.username}, expires_delta=access_token_expires94 )95 response = {96 "access_token": access_token,97 "token_type": "bearer",98 "username": user.username}99 return response100@app.get("/​api/​me", response_model=_schemas.User)101async def read_logged_in_user(current_user: _model.User = _fastapi.Depends(_services.authenticate_user)):...

Full Screen

Full Screen

app.py

Source: app.py Github

copy

Full Screen

1import fastapi as _fastapi2import fastapi.security as _security3import sqlalchemy.orm as _orm4from typing import List5import schemas as _schemas6import services as _services7from fastapi.middleware.cors import CORSMiddleware8app = _fastapi.FastAPI()9app.add_middleware(10 CORSMiddleware,11 allow_origins=["*"],12 allow_credentials=True,13 allow_methods=["*"],14 allow_headers=["*"]15)16@app.post("/​api/​v1/​users")17async def register_user(18 user: _schemas.UserRequest, db: _orm.Session = _fastapi.Depends(_services.get_db)19):20 # call to check if user with email exist21 db_user = await _services.getUserByEmail(email=user.email, db=db)22 # if user found throw exception23 if db_user:24 raise _fastapi.HTTPException(status_code=400, detail="Email already exist, try with another email!")25 # create the user and return a token26 db_user = await _services.create_user(user=user, db=db)27 return await _services.create_token(user=db_user)28@app.post("/​api/​v1/​login")29async def login_user(30 form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends(),31 db: _orm.Session = _fastapi.Depends(_services.get_db)32):33 db_user = await _services.login(email=form_data.username,34 password=form_data.password, db=db)35 #Invalid login then throw exception36 if not db_user:37 raise _fastapi.HTTPException(status_code=401, detail="Wrong Login Credentials!")38 #create and return the token39 return await _services.create_token(db_user)40@app.get("/​api/​v1/​users/​current-user", response_model=_schemas.UserResponse)41async def current_user(user: _schemas.UserResponse = _fastapi.Depends(_services.current_user)):42 return user43@app.post("/​api/​v1/​posts", response_model=_schemas.PostResponse)44async def create_post(45 post_request: _schemas.PostRequest,46 user: _schemas.UserRequest = _fastapi.Depends(_services.current_user),47 db: _orm.Session = _fastapi.Depends(_services.get_db)48):49 return await _services.create_post(user=user, db=db, post=post_request)50@app.get("/​api/​v1/​posts/​user", response_model=List[_schemas.PostResponse])51async def get_posts_by_user(52 user: _schemas.UserRequest = _fastapi.Depends(_services.current_user),53 db: _orm.Session = _fastapi.Depends(_services.get_db)54):55 return await _services.get_posts_by_user(user=user, db=db)56@app.get("/​api/​v1/​posts/​all", response_model=List[_schemas.PostResponse])57async def get_posts_by_all(58 db: _orm.Session = _fastapi.Depends(_services.get_db)59):60 return await _services.get_posts_by_all(db=db)61@app.get("/​api/​v1/​posts/​{post_id}/​", response_model=_schemas.PostResponse)62async def get_post_detail(63 post_id:int, db: _orm.Session = _fastapi.Depends(_services.get_db)64):65 post = await _services.get_post_detail(post_id=post_id, db=db)66 return post67@app.get("/​api/​v1/​users/​{user_id}/​", response_model=_schemas.UserResponse)68async def get_user_detail(69 user_id:int, db: _orm.Session = _fastapi.Depends(_services.get_db)70):71 return await _services.get_user_detail(user_id=user_id, db=db)72@app.delete("/​api/​v1/​posts/​{post_id}/​")73async def delete_post(74 post_id: int,75 db: _orm.Session = _fastapi.Depends(_services.get_db),76 user: _schemas.UserRequest = _fastapi.Depends(_services.current_user)77):78 post = await _services.get_post_detail(post_id=post_id, db=db)79 await _services.delete_post(post=post, db=db)80 return "Post deleted successfully"81@app.put("/​api/​v1/​posts/​{post_id}/​", response_model=_schemas.PostResponse)82async def update_post(83 post_id: int,84 post_request: _schemas.PostRequest,85 db: _orm.Session = _fastapi.Depends(_services.get_db)86):87 db_post = await _services.get_post_detail(post_id=post_id, db=db)...

Full Screen

Full Screen

main.py

Source: main.py Github

copy

Full Screen

1from typing import List2import fastapi as _fastapi3import fastapi.security as _security4import sqlalchemy.orm as _orm5import services as _services, schemas as _schemas6app = _fastapi.FastAPI()7@app.post("/​api/​users")8async def create_user(9 user: _schemas.UserCreate, db: _orm.Session = _fastapi.Depends(_services.get_db)10):11 db_user = await _services.get_user_by_email(user.email, db)12 if db_user:13 raise _fastapi.HTTPException(status_code=400, detail="Email already in use")14 user = await _services.create_user(user, db)15 return await _services.create_token(user)16@app.post("/​api/​token")17async def generate_token(from_data:_security.OAuth2PasswordRequestForm=_fastapi.Depends(),db:_orm.Session = _fastapi.Depends(_services.get_db)):18 user = await _services.authenticate_user(from_data.username, from_data.password, db)19 if not user:20 raise _fastapi.HTTPException(status_code=401, detail="Invalid Credentials")21 22 return await _services.create_token(user)23@app.get("api/​users/​me", response_model=_schemas.User)24async def get_user(user:_schemas.User=_fastapi.Depends(_services.get_current_user)):25 return user26@app.post("/​api/​leads", response_model=_schemas.Lead)27async def create_lead(lead: _schemas.LeadCreate,user:_schemas.User = _fastapi.Depends(_services.get_current_user),db:_orm.Session = _fastapi.Depends(_services.get_db)):28 return await _services.create_lead(user=user, db=db, lead=lead)29@app.get("/​api/​leads", response_model=List[_schemas.Lead])30async def get_leads(user:_schemas.User=_fastapi.Depends(_services.get_current_user),db:_orm.Session = _fastapi.Depends(_services.get_db)):31 return await _services.get_leads(user=user, db=db)32@app.get("/​api/​leads/​{lead_id}", status_code=200)33async def get_lead(lead_id:int,user:_schemas.User = _fastapi.Depends(_services.get_current_user),db:_orm.Session=_fastapi.Depends(_services.get_db)):34 return await _services.get_lead(lead_id, user, db)35@app.delete("/​api/​leads/​{lead_id}", status_code=204)36async def delete_lead(lead_id:int, user:_schemas.User=_fastapi.Depends(_services.get_current_user),db:_orm.Session=_fastapi.Depends(_services.get_db)):37 await _services.delete_lead(lead_id, user, db)38 return {"Message", "Successfullty Delete"}39@app.delete("/​api/​leads/​{lead_id}", status_code=200)40async def update_lead(lead_id:int, lead:_schemas.LeadCreate,user:_schemas.User=_fastapi.Depends(_services.get_db)):41 await _services.update_lead(lead_id, lead, user, db)42 return ("Message", "Successfully updatet leads")43@app.get("/​api")44async def root():...

Full Screen

Full Screen

Blogs

Check out the latest blogs from LambdaTest on this topic:

13 Best Java Testing Frameworks For 2023

The fact is not alien to us anymore that cross browser testing is imperative to enhance your application’s user experience. Enhanced knowledge of popular and highly acclaimed testing frameworks goes a long way in developing a new app. It holds more significance if you are a full-stack developer or expert programmer.

QA Innovation – Using the senseshaping concept to discover customer needs

QA Innovation - Using the senseshaping concept to discover customer needsQA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.

Best 23 Web Design Trends To Follow In 2023

Having a good web design can empower business and make your brand stand out. According to a survey by Top Design Firms, 50% of users believe that website design is crucial to an organization’s overall brand. Therefore, businesses should prioritize website design to meet customer expectations and build their brand identity. Your website is the face of your business, so it’s important that it’s updated regularly as per the current web design trends.

Acquiring Employee Support for Change Management Implementation

Enterprise resource planning (ERP) is a form of business process management software—typically a suite of integrated applications—that assists a company in managing its operations, interpreting data, and automating various back-office processes. The introduction of a new ERP system is analogous to the introduction of a new product into the market. If the product is not handled appropriately, it will fail, resulting in significant losses for the business. Most significantly, the employees’ time, effort, and morale would suffer as a result of the procedure.

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful