Best Python code snippet using localstack_python
api.py
Source:api.py
1""" ..2django-helpdesk - A Django powered ticket tracker for small enterprise.3(c) Copyright 2008 Jutda. All Rights Reserved. See LICENSE for details.4api.py - Wrapper around API calls, and core functions to provide complete5 API to third party applications.6The API documentation can be accessed by visiting http://helpdesk/api/help/7(obviously, substitute helpdesk for your django-helpdesk URI), or by reading8through templates/helpdesk/help_api.html.9"""10from django import forms11from django.contrib.auth import authenticate12try:13 from django.contrib.auth import get_user_model14 User = get_user_model()15except ImportError:16 from django.contrib.auth.models import User17from django.http import HttpResponse18from django.shortcuts import render_to_response19from django.template import loader, Context20import simplejson21from django.views.decorators.csrf import csrf_exempt22try:23 from django.utils import timezone24except ImportError:25 from datetime import datetime as timezone26from tendenci.apps.helpdesk.forms import TicketForm27from tendenci.apps.helpdesk.lib import send_templated_mail, safe_template_context28from tendenci.apps.helpdesk.models import Ticket, Queue, FollowUp29STATUS_OK = 20030STATUS_ERROR = 40031STATUS_ERROR_NOT_FOUND = 40432STATUS_ERROR_PERMISSIONS = 40333STATUS_ERROR_BADMETHOD = 40534@csrf_exempt35def api(request, method):36 """37 Regardless of any other paramaters, we provide a help screen38 to the user if they requested one.39 If the user isn't looking for help, then we enforce a few conditions:40 * The request must be sent via HTTP POST41 * The request must contain a 'user' and 'password' which42 must be valid users43 * The method must match one of the public methods of the API class.44 """45 if method == 'help':46 return render_to_response('helpdesk/help_api.html')47 if request.method != 'POST':48 return api_return(STATUS_ERROR_BADMETHOD)49 # TODO: Move away from having the username & password in every request.50 request.user = authenticate(51 username=request.POST.get('user', False),52 password=request.POST.get('password'),53 )54 if request.user is None:55 return api_return(STATUS_ERROR_PERMISSIONS)56 api = API(request)57 if hasattr(api, 'api_public_%s' % method):58 return getattr(api, 'api_public_%s' % method)()59 return api_return(STATUS_ERROR)60def api_return(status, text='', json=False):61 content_type = 'text/plain'62 if status == STATUS_OK and json:63 content_type = 'text/json'64 if text is None:65 if status == STATUS_ERROR:66 text = 'Error'67 elif status == STATUS_ERROR_NOT_FOUND:68 text = 'Resource Not Found'69 elif status == STATUS_ERROR_PERMISSIONS:70 text = 'Invalid username or password'71 elif status == STATUS_ERROR_BADMETHOD:72 text = 'Invalid request method'73 elif status == STATUS_OK:74 text = 'OK'75 r = HttpResponse(status=status, content=text, content_type=content_type)76 if status == STATUS_ERROR_BADMETHOD:77 r.Allow = 'POST'78 return r79class API:80 def __init__(self, request):81 self.request = request82 def api_public_create_ticket(self):83 form = TicketForm(self.request.POST)84 form.fields['queue'].choices = [[q.id, q.title] for q in Queue.objects.all()]85 form.fields['assigned_to'].choices = [[u.id, u.get_username()] for u in User.objects.filter(is_active=True)]86 if form.is_valid():87 ticket = form.save(user=self.request.user)88 return api_return(STATUS_OK, "%s" % ticket.id)89 else:90 return api_return(STATUS_ERROR, text=form.errors.as_text())91 def api_public_list_queues(self):92 return api_return(STATUS_OK, simplejson.dumps([{"id": "%s" % q.id, "title": "%s" % q.title} for q in Queue.objects.all()]), json=True)93 def api_public_find_user(self):94 username = self.request.POST.get('username', False)95 try:96 u = User.objects.get(username=username)97 return api_return(STATUS_OK, "%s" % u.id)98 except User.DoesNotExist:99 return api_return(STATUS_ERROR, "Invalid username provided")100 def api_public_delete_ticket(self):101 if not self.request.POST.get('confirm', False):102 return api_return(STATUS_ERROR, "No confirmation provided")103 try:104 ticket = Ticket.objects.get(id=self.request.POST.get('ticket', False))105 except Ticket.DoesNotExist:106 return api_return(STATUS_ERROR, "Invalid ticket ID")107 ticket.delete()108 return api_return(STATUS_OK)109 def api_public_hold_ticket(self):110 try:111 ticket = Ticket.objects.get(id=self.request.POST.get('ticket', False))112 except Ticket.DoesNotExist:113 return api_return(STATUS_ERROR, "Invalid ticket ID")114 ticket.on_hold = True115 ticket.save()116 return api_return(STATUS_OK)117 def api_public_unhold_ticket(self):118 try:119 ticket = Ticket.objects.get(id=self.request.POST.get('ticket', False))120 except Ticket.DoesNotExist:121 return api_return(STATUS_ERROR, "Invalid ticket ID")122 ticket.on_hold = False123 ticket.save()124 return api_return(STATUS_OK)125 def api_public_add_followup(self):126 try:127 ticket = Ticket.objects.get(id=self.request.POST.get('ticket', False))128 except Ticket.DoesNotExist:129 return api_return(STATUS_ERROR, "Invalid ticket ID")130 message = self.request.POST.get('message', None)131 public = self.request.POST.get('public', 'n')132 if public not in ['y', 'n']:133 return api_return(STATUS_ERROR, "Invalid 'public' flag")134 if not message:135 return api_return(STATUS_ERROR, "Blank message")136 f = FollowUp(137 ticket=ticket,138 date=timezone.now(),139 comment=message,140 user=self.request.user,141 title='Comment Added',142 )143 if public:144 f.public = True145 f.save()146 context = safe_template_context(ticket)147 context['comment'] = f.comment148 messages_sent_to = []149 if public and ticket.submitter_email:150 send_templated_mail(151 'updated_submitter',152 context,153 recipients=ticket.submitter_email,154 sender=ticket.queue.from_address,155 fail_silently=True,156 )157 messages_sent_to.append(ticket.submitter_email)158 if public:159 for cc in ticket.ticketcc_set.all():160 if cc.email_address not in messages_sent_to:161 send_templated_mail(162 'updated_submitter',163 context,164 recipients=cc.email_address,165 sender=ticket.queue.from_address,166 fail_silently=True,167 )168 messages_sent_to.append(cc.email_address)169 if ticket.queue.updated_ticket_cc and ticket.queue.updated_ticket_cc not in messages_sent_to:170 send_templated_mail(171 'updated_cc',172 context,173 recipients=ticket.queue.updated_ticket_cc,174 sender=ticket.queue.from_address,175 fail_silently=True,176 )177 messages_sent_to.append(ticket.queue.updated_ticket_cc)178 if (179 ticket.assigned_to and180 self.request.user != ticket.assigned_to and181 ticket.assigned_to.usersettings.settings.get('email_on_ticket_apichange', False) and182 ticket.assigned_to.email and183 ticket.assigned_to.email not in messages_sent_to184 ):185 send_templated_mail(186 'updated_owner',187 context,188 recipients=ticket.assigned_to.email,189 sender=ticket.queue.from_address,190 fail_silently=True,191 )192 ticket.save()193 return api_return(STATUS_OK)194 def api_public_resolve(self):195 try:196 ticket = Ticket.objects.get(id=self.request.POST.get('ticket', False))197 except Ticket.DoesNotExist:198 return api_return(STATUS_ERROR, "Invalid ticket ID")199 resolution = self.request.POST.get('resolution', None)200 if not resolution:201 return api_return(STATUS_ERROR, "Blank resolution")202 f = FollowUp(203 ticket=ticket,204 date=timezone.now(),205 comment=resolution,206 user=self.request.user,207 title='Resolved',208 public=True,209 )210 f.save()211 context = safe_template_context(ticket)212 context['resolution'] = f.comment213 subject = '%s %s (Resolved)' % (ticket.ticket, ticket.title)214 messages_sent_to = []215 if ticket.submitter_email:216 send_templated_mail(217 'resolved_submitter',218 context,219 recipients=ticket.submitter_email,220 sender=ticket.queue.from_address,221 fail_silently=True,222 )223 messages_sent_to.append(ticket.submitter_email)224 for cc in ticket.ticketcc_set.all():225 if cc.email_address not in messages_sent_to:226 send_templated_mail(227 'resolved_submitter',228 context,229 recipients=cc.email_address,230 sender=ticket.queue.from_address,231 fail_silently=True,232 )233 messages_sent_to.append(cc.email_address)234 if ticket.queue.updated_ticket_cc and ticket.queue.updated_ticket_cc not in messages_sent_to:235 send_templated_mail(236 'resolved_cc',237 context,238 recipients=ticket.queue.updated_ticket_cc,239 sender=ticket.queue.from_address,240 fail_silently=True,241 )242 messages_sent_to.append(ticket.queue.updated_ticket_cc)243 if ticket.assigned_to and self.request.user != ticket.assigned_to and getattr(ticket.assigned_to.usersettings.settings, 'email_on_ticket_apichange', False) and ticket.assigned_to.email and ticket.assigned_to.email not in messages_sent_to:244 send_templated_mail(245 'resolved_resolved',246 context,247 recipients=ticket.assigned_to.email,248 sender=ticket.queue.from_address,249 fail_silently=True,250 )251 ticket.resoltuion = f.comment252 ticket.status = Ticket.RESOLVED_STATUS253 ticket.save()...
ea_tasks.py
Source:ea_tasks.py
1import easyaccess as ea2import threading3import os4import time5import json6import glob7STATUS_OK = 'ok'8STATUS_ERROR = 'error'9DEFAULT_RESPONSE = {10 'status': STATUS_OK,11 'msg': '',12 'elapsed': 0.0,13 'data': {},14 'files': [],15 'sizes': []16}17def get_filesize(filename):18 size = os.path.getsize(filename)19 size = size * 1. / 1024.20 if size > 1024. * 1024:21 size = '%.2f GB' % (1. * size / 1024. / 1024)22 elif size > 1024.:23 size = '%.2f MB' % (1. * size / 1024.)24 else:25 size = '%.2f KB' % (size)26 return size27def check_query(query, db, username, password):28 response = DEFAULT_RESPONSE29 try:30 connection = ea.connect(db, user=username, passwd=password)31 cursor = connection.cursor()32 except Exception as e:33 response['status'] = STATUS_ERROR34 response['msg'] = str(e).strip()35 return response36 try:37 cursor.parse(query.encode())38 except Exception as e:39 response['status'] = STATUS_ERROR40 response['msg'] = str(e).strip()41 cursor.close()42 connection.close()43 return response44def run_quick(query, db, username, password):45 response = DEFAULT_RESPONSE46 try:47 connection = ea.connect(db, user=username, passwd=password)48 cursor = connection.cursor()49 # Start the clock50 tt = threading.Timer(25, connection.con.cancel)51 tt.start()52 if query.lower().lstrip().startswith('select'):53 try:54 df = connection.query_to_pandas(query)55 # df.to_csv(os.path.join(user_folder, 'quickResults.csv'), index=False)56 df = df[0:1000]57 data = df.to_json(orient='records')58 response['data'] = data59 except Exception as e:60 err_out = str(e).strip()61 if 'ORA-01013' in err_out:62 err_out = 'Time Exceeded (30 seconds). Please try submitting the job'63 response['status'] = STATUS_ERROR64 response['msg'] = err_out65 else:66 try:67 df = cursor.execute(query)68 connection.con.commit()69 except Exception as e:70 err_out = str(e).strip()71 if 'ORA-01013' in err_out:72 err_out = 'Time Exceeded (30 seconds). Please try submitting this query as job'73 response['status'] = STATUS_ERROR74 response['msg'] = err_out75 # Stop the clock76 tt.cancel()77 # Close database connection78 cursor.close()79 connection.close()80 except Exception as e:81 response['status'] = STATUS_ERROR82 response['msg'] = str(e).strip()83 return response84 return response85def run_query(query, db, username, password, job_folder, filename, compression = False, timeout=None):86 response = DEFAULT_RESPONSE87 if not os.path.exists(job_folder):88 os.mkdir(job_folder)89 jsonfile = os.path.join(job_folder, 'meta.json')90 try:91 t1 = time.time()92 # Open database connection93 try:94 connection = ea.connect(db, user=username, passwd=password)95 cursor = connection.cursor()96 except Exception as e:97 response['status'] = STATUS_ERROR98 response['msg'] = str(e).strip()99 with open(jsonfile, 'w') as fp:100 json.dump(response, fp)101 return response102 if timeout is not None:103 tt = threading.Timer(timeout, connection.con.cancel)104 tt.start()105 if query.lower().lstrip().startswith('select'):106 try:107 outfile = os.path.join(job_folder, filename)108 if compression:109 connection.compression = True110 connection.query_and_save(query, outfile)111 if timeout is not None:112 tt.cancel()113 t2 = time.time()114 files = glob.glob(job_folder + '/*')115 response['files'] = [os.path.basename(i) for i in files]116 response['sizes'] = [get_filesize(i) for i in files]117 except Exception as e:118 if timeout is not None:119 tt.cancel()120 t2 = time.time()121 response['status'] = STATUS_ERROR122 response['msg'] = str(e).strip()123 raise124 else:125 try:126 cursor.execute(query)127 connection.con.commit()128 if timeout is not None:129 tt.cancel()130 t2 = time.time()131 except Exception as e:132 if timeout is not None:133 tt.cancel()134 t2 = time.time()135 response['status'] = STATUS_ERROR136 response['msg'] = str(e).strip()137 response['elapsed'] = t2 - t1138 with open(jsonfile, 'w') as fp:139 json.dump(response, fp, indent=2)140 cursor.close()141 connection.close()142 return response143 except Exception:...
utils.py
Source:utils.py
1from rest_framework.serializers import ValidationError2STATUS_ERROR = '02'3STATUS_SUCCESS = '00'4def check_user_data(email, first_name, last_name, phone_number) -> dict:5 ''' Check if Account data is valid '''6 error_template = 'User account must have'7 data = {8 'errors': {},9 'status': STATUS_SUCCESS,10 }11 if not email:12 data['errors']['email'] = f'{error_template} email.'13 data['status'] = STATUS_ERROR14 if not first_name:15 data['errors']['first_name'] =f'{error_template} first name.'16 data['status'] = STATUS_ERROR17 if not last_name:18 data['errors']['last_name'] = f'{error_template} last name.'19 data['status'] = STATUS_ERROR20 if not phone_number:21 data['errors']['phone_number'] = f'{error_template} phone number.'22 data['status'] = STATUS_ERROR23 24 return data25def check_passwords(password1, password2):26 ''' CHECK IF 2 PASSWORDS ARE EQUAL OR NOT NONE '''27 data = {28 'errors': {},29 'password': None,30 'status': STATUS_SUCCESS, 31 }32 if password1 is not None and password2 is not None:33 if password1 == password2:34 data['password'] = password135 else:36 data['errors']['password2'] = 'Passwords are not equal.' 37 data['status'] = STATUS_ERROR38 else:39 data['errors']['password'] = 'password1 or password2 have not been declared.' 40 data['status'] = STATUS_ERROR41 ...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!