Best Python code snippet using django-test-plus_python
server.py
Source:server.py
1"""An example of a simple HTTP server."""2import json3import mimetypes4import pickle5import socket6from os.path import isdir7from urllib.parse import unquote_plus8# Pickle file for storing data9PICKLE_DB = "db.pkl"10# Directory containing www data11WWW_DATA = "www-data"12# Header template for a successful HTTP request13HEADER_RESPONSE_200 = """HTTP/1.1 200 OK\r14content-type: %s\r15content-length: %d\r16connection: Close\r17\r18"""19# Represents a table row that holds user data20TABLE_ROW = """21<tr>22 <td>%d</td>23 <td>%s</td>24 <td>%s</td>25</tr>26"""27# Template for a 404 (Not found) error28RESPONSE_404 = """HTTP/1.1 404 Not found\r29content-type: text/html\r30connection: Close\r31\r32<!doctype html>33<h1>404 Page not found</h1>34<p>Page cannot be found.</p>35"""36RESPONSE_301 = """HTTP/1.1 301 Moved Permanently\r37location: %s\r38connection: Close\r39\r40"""41RESPONSE_400 = """HTTP/1.1 400 Bad Request\r42content-type: text/html\r43connection: Close\r44\r45<!doctype html>46<h1>400 Bad Request</h1>47<p>Page cannot be found.</p>48"""49RESPONSE_405 = """HTTP/1.1 405 Method Not Allowed\r50content-type: text/html\r51connection: Close\r52\r53<!doctype html>54<h1>405 Page Not Allowed</h1>55<p>Method Not Allowed.</p>56"""57def save_to_db(first, last):58 """Create a new user with given first and last name and store it into59 file-based database.60 For instance, save_to_db("Mick", "Jagger"), will create a new user61 "Mick Jagger" and also assign him a unique number.62 Do not modify this method."""63 existing = read_from_db()64 existing.append({65 "number": 1 if len(existing) == 0 else existing[-1]["number"] + 1,66 "first": first,67 "last": last68 })69 with open(PICKLE_DB, "wb") as handle:70 pickle.dump(existing, handle)71def read_from_db(criteria=None):72 """Read entries from the file-based DB subject to provided criteria73 Use this method to get users from the DB. The criteria parameters should74 either be omitted (returns all users) or be a dict that represents a query75 filter. For instance:76 - read_from_db({"number": 1}) will return a list of users with number 177 - read_from_db({"first": "bob"}) will return a list of users whose first78 name is "bob".79 Do not modify this method."""80 if criteria is None:81 criteria = {}82 else:83 # remove empty criteria values84 for key in ("number", "first", "last"):85 if key in criteria and criteria[key] == "":86 del criteria[key]87 # cast number to int88 if "number" in criteria:89 criteria["number"] = int(criteria["number"])90 try:91 with open(PICKLE_DB, "rb") as handle:92 data = pickle.load(handle)93 filtered = []94 for entry in data:95 predicate = True96 for key, val in criteria.items():97 if val != entry[key]:98 predicate = False99 if predicate:100 filtered.append(entry)101 return filtered102 except (IOError, EOFError):103 return []104def parse_headers(client):105 headers = dict()106 while True:107 line = client.readline().decode("utf-8").strip()108 if not line:109 return headers110 key, value = line.split(":", 1)111 headers[key.strip().lower()] = value.strip()112def parse_get_parametars(p):113 if p == "":114 return None115 d = dict()116 parametars = p.split("&")117 for i in parametars:118 key, value = i.split("=", 1)119 d[key.strip()] = value.strip()120 return d121def process_request(connection, address):122 client = connection.makefile("wrb")123 line = client.readline().decode("utf-8").strip()124 try:125 method, uri, version = line.split(" ")126 headers = parse_headers(client)127 get_parameters = ""128 if method != "GET" and method != "POST":129 print("405 Error : Wrong Method")130 client.write(RESPONSE_405.encode("utf-8"))131 client.close()132 if uri[0] != "/" and not (len(uri) > 0):133 print("400 Error : Bad Request")134 client.write(RESPONSE_400.encode("utf-8"))135 client.close()136 if version != "HTTP/1.1":137 print("400 Error : Bad Request")138 client.write(RESPONSE_400.encode("utf-8"))139 client.close()140 if "host" not in headers.keys():141 print("400 Error : Bad Request")142 client.write(RESPONSE_400.encode("utf-8"))143 client.close()144 if method == "POST" and "content-length" not in headers.keys():145 print("400 Error : Bad Request, no Content-Length")146 client.write(RESPONSE_400.encode("utf-8"))147 client.close()148 if "?" in uri:149 uri, get_parameters = uri.split("?", 1)150 except Exception as e:151 print("[%s:] Exception parsing '%s': %s" % (address, line, e))152 client.write(RESPONSE_400.encode("utf-8"))153 client.close()154 uri_dynamic = ["/www-data/app-add", "/app-add", "/www-data/app-index", "/app-index", "/www-data/app-json", "/app-json"]155 dynamic = 0156 if uri in uri_dynamic:157 if uri == "/app-index" or uri == "/www-data/app-index":158 uri = "/app_list.html"159 dynamic = 1160 if method != "GET":161 print("405 Error : Wrong Method")162 client.write(RESPONSE_405.encode("utf-8"))163 client.close()164 elif uri == "/app-add" or uri == "/www-data/app-add":165 uri = "/app_add.html"166 dynamic = 2167 if method != "POST":168 print("405 Error : Wrong Method")169 client.write(RESPONSE_405.encode("utf-8"))170 client.close()171 elif uri == "/app-json" or "/www-data/app-json":172 dynamic = 3173 if method != "GET":174 print("405 Error : Wrong Method")175 client.write(RESPONSE_405.encode("utf-8"))176 client.close()177 try:178 get_parameters = unquote_plus(get_parameters, "utf-8")179 body = json.dumps(read_from_db(parse_get_parametars(get_parameters))).encode()180 head = HEADER_RESPONSE_200 % (181 "application/json",182 len(body)183 )184 client.write(head.encode("utf-8"))185 client.write(body)186 except Exception as e:187 print("[%s:] Exception parsing '%s': %s" % (address, line, e))188 client.write(RESPONSE_400.encode("utf-8"))189 client.close()190 if (uri[-1] == "/"):191 r_301 = RESPONSE_301 % ("http://" + headers["host"].split(":")[0] + ":" + headers["host"].split(":")[1] + uri + "index.html")192 uri += "index.html"193 client.write(r_301.encode("utf-8"))194 elif uri[-1] != "/" and "." not in uri: # if doesnt have trailing slash and if its a dicitonary195 if (isdir(WWW_DATA + uri)):196 r_301 = RESPONSE_301 % ("http://" + headers["host"].split(":")[0] + ":" + headers["host"].split(":")[1] + uri + "/index.html")197 uri += "/index.html"198 client.write(r_301.encode("utf-8"))199 if dynamic in [0, 1, 2]:200 try:201 with open(WWW_DATA + "/" + uri, "rb") as handle: # 200 ok202 body = handle.read()203 mime_type, _ = mimetypes.guess_type(WWW_DATA + "/" + uri)204 if mime_type == None:205 mime_type = "application/octet-stream"206 if dynamic == 0:207 head = HEADER_RESPONSE_200 % (208 mime_type,209 len(body)210 )211 client.write(head.encode("utf-8"))212 client.write(body)213 elif dynamic == 1:214 table = ""215 get_parameters = unquote_plus(get_parameters, "utf-8")216 students = read_from_db(parse_get_parametars(get_parameters))217 for student in students:218 table += TABLE_ROW % (219 student["number"],220 student["first"],221 student["last"],222 )223 body = body.replace(b"{{students}}", table.encode("utf-8"))224 head = HEADER_RESPONSE_200 % (225 mime_type,226 len(body)227 )228 client.write(head.encode("utf-8"))229 client.write(body)230 elif dynamic == 2:231 post_parameters = (client.read(int(headers["content-length"])))232 post_parameters = unquote_plus(post_parameters.decode("utf-8"), "utf-8")233 d = parse_get_parametars(post_parameters)234 if "first" not in d.keys() and "last" not in d.keys():235 print("400 Error : Bad Request, missing parameters")236 client.write(RESPONSE_400.encode("utf-8"))237 connection.close()238 save_to_db(str(d["first"]), str(d["last"]))239 head = HEADER_RESPONSE_200 % (240 mime_type,241 len(body)242 )243 client.write(head.encode("utf-8"))244 client.write(body)245 except IOError:246 client.write(RESPONSE_404.encode("utf-8"))247 client.close()248 finally:249 client.close()250 client.close()251def main(port):252 """Starts the server and waits for connections."""253 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)254 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)255 server.bind(("", port))256 server.listen(1)257 print("Listening on %d" % port)258 while True:259 connection, address = server.accept()260 print("[%s:%d] CONNECTED" % address)261 process_request(connection, address)262 connection.close()263 print("[%s:%d] DISCONNECTED" % address)264if __name__ == "__main__":...
apis.py
Source:apis.py
...31 mobile = request.data.get(keys.MOBILE) or ''32 """ Validate request data """33 # Username validation34 if not HelperAPIValidation.is_valid_username(username.strip()):35 return HelperResponse.response_400(request, messages.VALID_USERNAME)36 # Password Validation37 if not HelperAPIValidation.is_valid_password(password.strip()):38 return HelperResponse.response_400(request, messages.VALID_PASSWORD)39 # Email Validation if filled something40 if email is not None and email.strip() != '' and not HelperAPIValidation.is_valid_email(email.strip()):41 return HelperResponse.response_400(request, messages.INVALID_EMAIL)42 # Mobile Validation if filled something43 if mobile is not None and mobile.strip() != '' and not HelperAPIValidation.is_valid_mobile(mobile.strip()):44 return HelperResponse.response_400(request, messages.VALID_MOBILE)45 # Validate existence of username46 if UserData.objects.filter(user__username=username).exists():47 return HelperResponse.response_400(request, messages.USERNAME_ALREADY_EXISTS)48 else:49 django_user = User.objects.create_user(username=username, email=email)50 django_user.set_password(password)51 django_user.save()52 UserData.objects.create(53 user=django_user,54 email=email,55 mobile=int(mobile) if str(mobile).isdigit() else None56 )57 response = {58 keys.MESSAGE: messages.WELCOME_USERNAME.format(username=username)59 }60 token = HelperAuthentication.obtain_token(request, username=username, password=password)61 return Response(response, status=status.HTTP_200_OK, headers={keys.TOKEN: token})62@swagger_auto_schema(63 operation_id='User Login API', operation_description=doc_descriptions.API_USER_LOGIN,64 method='post', responses={65 200: 'Token will be sent in header and a welcome message in body',66 400: 'Invalid credentials',67 },68 request_body=doc_serializers.UserLoginRequestBody69)70@api_view(['POST'])71def login(request):72 """73 User Login API74 :param request:75 :return:76 """77 HelperCommon.print_method_title("API-Login")78 username = request.data.get(keys.USERNAME) or None79 password = request.data.get(keys.PASSWORD) or None80 django_user = authenticate(request, username=username, password=password)81 if django_user is not None:82 # User exists83 response = {84 keys.MESSAGE: messages.WELCOME_USERNAME.format(username=username)85 }86 token = HelperAuthentication.obtain_token(request, username=username, password=password)87 return Response(response, status=status.HTTP_200_OK, headers={keys.TOKEN: token})88 else:89 # Invalid credentials90 return HelperResponse.response_400(request, messages.INVALID_CREDENTIALS)91@swagger_auto_schema(92 operation_id='User Profile API', operation_description=doc_descriptions.API_USER_LOGIN,93 manual_parameters=[openapi.Parameter(keys.TOKEN, openapi.IN_HEADER, type=openapi.TYPE_STRING, required=True)],94 method='get', response={95 200: doc_serializers.UserProfileResponseBody96 }97)98@api_view(['GET'])99@HelperAuthentication.validate_access_token100def profile(request):101 """102 User Profile GET API103 :param request:104 :return:105 """106 HelperCommon.print_method_title("API- User Profile")107 try:108 decoded_json = HelperAuthentication.decode_access_token(HelperCommon.get_meta_token(request.META))109 username = decoded_json[keys.USERNAME]110 # Get the instance of the user111 user_instance = UserData.objects.get(user__username=username)112 # Get profile data from serializer113 serializer = UserDataSerializer(user_instance, many=False)114 return Response(serializer.data, status=status.HTTP_200_OK,115 headers=HelperCommon.common_response_header(request))116 except Exception as e:117 return HelperResponse.response_400(request, str(e))118@api_view(['POST'])119def logout(request):120 """121 "JWT is stateless"122 THIS API is not needed because existence of token can be managed from the123 frontend application.124 Deleting the token will act as logout.125 :param request:126 :return:127 """...
views.py
Source:views.py
1from rest_framework import generics, status2from .serializers import *3from core.models import *4from core.api.filter import *5from rest_framework.response import Response6from core.api.authentication import *7from rest_framework.permissions import IsAuthenticated8from django_filters.rest_framework import DjangoFilterBackend9response_204 = Response({'status': 204, 'description': 'OK'}, status=status.HTTP_204_NO_CONTENT)10response_400 = Response({'status': 400, 'description': 'Bad request.'}, status=status.HTTP_400_BAD_REQUEST)11response_500 = Response({'status': 500, 'description': 'There is an internal issue.'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)12response_200 = Response({'status': 200, 'description': 'OK.'}, status=status.HTTP_200_OK)13class InsertMovie(generics.CreateAPIView):14 permission_classes = (AdminPermission,)15 serializer_class = MovieSerializer16 def post(self, request, *args, **kwargs):17 if set(request.data.keys()) == set(['name', 'description']):18 try:19 Movie.objects.create(name=request.data['name'], description=request.data['description'])20 return response_20421 except: return response_50022 else: return response_40023class ManageMovie(generics.RetrieveUpdateDestroyAPIView):24 serializer_class = MovieSerializer25 permission_classes = (AdminPermission,)26 def put(self, request, *args, **kwargs):27 if set(request.data.keys()) == set(['name', 'description']):28 try:29 Movie.objects.get(id=kwargs['pk'])30 Movie.objects.filter(id=kwargs['pk']).update(name=request.data['name'], description=request.data['description'])31 return response_20432 except: return response_40033 else: return response_40034 def delete(self, request, *args, **kwargs):35 try:36 movie = Movie.objects.get(id=kwargs['pk'])37 movie.delete()38 return response_20439 except: return response_40040class ManageComment(generics.RetrieveUpdateDestroyAPIView):41 serializer_class = CommentSerializerAdmin 42 def put(self, request, *args, **kwargs):43 if set(request.data.keys()) == set(['approved']):44 try:45 Comment.objects.get(id=kwargs['pk'])46 Comment.objects.filter(id=kwargs['pk']).update(approved=request.data['approved'])47 return response_20448 except: return response_40049 else: return response_40050 def delete(self, request, *args, **kwargs):51 try:52 comment = Comment.objects.get(id=kwargs['pk'])53 comment.delete()54 return response_20455 except: return response_40056class UserVote(generics.CreateAPIView):57 serializer_class = VoteSerializer58 permission_classes = (UserPermission,)59 def post(self, request, *args, **kwargs):60 if set(request.data.keys()) == set(['movie_id', 'vote']):61 try:62 Vote.objects.create(rating=request.data['vote'], movie_id=request.data['movie_id'], user=request.user)63 return response_20464 except: return response_50065 else: return response_40066class UserComment(generics.CreateAPIView):67 serializer_class = CommentSerializer68 permission_classes = (UserPermission,)69 def post(self, request, *args, **kwargs):70 if set(request.data.keys()) == set(['movie_id', 'comment_body']):71 try:72 Movie.objects.get(id=request.data['movie_id'])73 Comment.objects.create(comment=request.data['comment_body'], movie_id=request.data['movie_id'], user=request.user)74 return response_20075 except: return response_40076 else: return response_40077class CommentAPI(generics.ListAPIView):78 permission_classes = (AnonymousPermission,)79 filter_backends = [DjangoFilterBackend]80 filter_class = CommentFilter81 serializer_class = GetCommentSerializer82 def get(self, request, *args, **kwargs):83 try:84 mv = Movie.objects.filter(id=request.query_params['movie_id'])85 mv[0]86 except:87 return response_40088 return super().get(request, *args, **kwargs)89 def get_queryset(self):90 try:91 return Comment.objects.filter(movie_id=self.request.query_params['movie_id'])92 except:93 return Comment.objects.none()94class MovieAPI(generics.ListAPIView):95 permission_classes = (AnonymousPermission,)96 serializer_class = GetMovieSerializer97 def get_queryset(self):98 return Movie.objects.all().order_by('id')99class MovieWithIDAPI(generics.ListAPIView):100 permission_classes = (AnonymousPermission,)101 serializer_class = MovieIDSerializer102 def get(self, request, *args, **kwargs):103 try:104 Movie.objects.get(id=kwargs['pk'])105 except:106 return response_400107 return super().get(request, *args, **kwargs)108 def get_queryset(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!