Best Python code snippet using autotest_python
tests.py
Source:tests.py
1import unittest2from unittest import mock3import hashlib4import os5from conf import FOLDERPATH, RequestVerb, PROTOCOL6from exceptions import NameIsTooLongError, CanNotParseRequestError7from regagent import ask_permission, process_permission8from storage import FilePhoneBook9from request import Request10from response import Response11events = []12class AsyncMock(mock.MagicMock):13 async def __call__(self, *args, **kwargs):14 return super().__call__(*args, **kwargs)15class TestFilePhoneBook(unittest.IsolatedAsyncioTestCase):16 def setUp(self):17 self.storage = FilePhoneBook(FOLDERPATH)18 filepath = os.path.join(19 FOLDERPATH, hashlib.sha256('ÐиÑилл ХмÑÑÑй'.encode()).hexdigest())20 with open(filepath, 'w') as self.f:21 self.f.write('79842342143')22 events.append("setUp")23 async def test_get(self):24 self.assertEqual(await self.storage.get('ÐиÑилл ХмÑÑÑй'),25 '79842342143')26 async def test_write(self):27 await self.storage.write('John', ['78124445598'])28 got = await self.storage.get('John')29 self.assertEqual(got, '78124445598')30 await self.storage.write('John', ['709931142255'])31 got = await self.storage.get('John')32 self.assertEqual(got, '709931142255')33 os.remove(os.path.join(34 FOLDERPATH, hashlib.sha256('John'.encode()).hexdigest()))35 async def test_delete(self):36 await self.storage.write('John', ['78124445598'])37 self.storage.delete('John')38 with self.assertRaises(FileNotFoundError):39 await self.storage.get('John')40 def tearDown(self):41 os.remove(os.path.join(42 FOLDERPATH, hashlib.sha256('ÐиÑилл ХмÑÑÑй'.encode()).hexdigest()))43 events.append("tearDown")44class TestRequest(unittest.TestCase):45 def test_parse_request_get(self):46 raw_request = 'ÐТÐÐÐÐÐ Ðван ХмÑÑÑй Ð ÐСÐÐ/1.0\r\n\r\n'47 req = Request(raw_request)48 self.assertEqual(req.name, 'Ðван ХмÑÑÑй')49 self.assertEqual(req.protocol, PROTOCOL)50 self.assertEqual(req.method, RequestVerb.GET)51 raw_request = 'ÐТÐÐÐÐÐ Ð ÐСÐÐ/1.0\r\n\r\n'52 self.assertRaises(CanNotParseRequestError, Request, raw_request)53 raw_request = 'ÐТÐÐÐÐÐ ÐЧÐÐЬ ÐÐÐÐÐÐÐ ÐÐЯ ÐÐТÐÐ ÐРЯÐÐÐ ÐÐÐЬШРТРÐÐЦ' + \54 'ÐТРСÐÐÐÐÐÐРТÐЧÐÐ ÐÐÐЬШР30 СÐÐÐÐÐÐÐ Ð ÐСÐÐ/1.0\r\n\r\n'55 self.assertRaises(NameIsTooLongError, Request, raw_request)56 def test_parse_request_delete(self):57 raw_request = 'УÐÐÐÐ Ðван ХмÑÑÑй Ð ÐСÐÐ/1.0\r\n\r\n'58 req = Request(raw_request)59 self.assertEqual(req.name, 'Ðван ХмÑÑÑй')60 self.assertEqual(req.protocol, PROTOCOL)61 self.assertEqual(req.method, RequestVerb.DELETE)62 raw_request = 'УÐÐÐÐ Ð ÐСÐÐ/1.0\r\n\r\n'63 self.assertRaises(CanNotParseRequestError, Request, raw_request)64 raw_request = 'ÐТÐÐÐÐÐ ÐЧÐÐЬ ÐÐÐÐÐÐÐ ÐÐЯ ÐÐТÐÐ ÐРЯÐÐÐ ÐÐÐЬШРТРÐÐЦ' + \65 'ÐТРСÐÐÐÐÐÐРТÐЧÐÐ ÐÐÐЬШР30 СÐÐÐÐÐÐÐ Ð ÐСÐÐ/1.0\r\n\r\n'66 self.assertRaises(NameIsTooLongError, Request, raw_request)67 def test_parse_request_write(self):68 raw_request = 'ÐÐÐÐШРÐван ХмÑÑÑй Ð ÐСÐÐ/1.0\r\n89012345678\r\n\r\n'69 req = Request(raw_request)70 self.assertEqual(req.name, 'Ðван ХмÑÑÑй')71 self.assertEqual(req.protocol, PROTOCOL)72 self.assertEqual(req.method, RequestVerb.WRITE)73 self.assertEqual(req.body, ['89012345678'])74 raw_request = 'ÐÐÐÐШРÐван ХмÑÑÑй Ð ÐСÐÐ/1.0\r\n' + \75 '89012345678 â мобилÑнÑй\r\n02 â ÑабоÑий\r\n\r\n'76 req = Request(raw_request)77 self.assertEqual(req.name, 'Ðван ХмÑÑÑй')78 self.assertEqual(req.protocol, PROTOCOL)79 self.assertEqual(req.method, RequestVerb.WRITE)80 self.assertEqual(req.body, ['89012345678 â мобилÑнÑй', '02 â ÑабоÑий'])81 raw_request = 'ÐÐÐÐШРРÐСÐÐ/1.0\r\n\r\n'82 self.assertRaises(CanNotParseRequestError, Request, raw_request)83 raw_request = 'ÐТÐÐÐÐÐ ÐЧÐÐЬ ÐÐÐÐÐÐÐ ÐÐЯ ÐÐТÐÐ ÐРЯÐÐÐ ÐÐÐЬШРТРÐÐЦ' + \84 'ÐТРСÐÐÐÐÐÐРТÐЧÐÐ ÐÐÐЬШР30 СÐÐÐÐÐÐÐ Ð ÐСÐÐ/1.0\r\n\r\n'85 self.assertRaises(NameIsTooLongError, Request, raw_request)86class TestResponse(unittest.IsolatedAsyncioTestCase):87 async def asyncSetUp(self) -> None:88 self.storage = FilePhoneBook(FOLDERPATH)89 await self.storage.write('Petr', ['79842342143'])90 events.append("asyncSetUp")91 async def test_make_get(self):92 raw_request = 'ÐТÐÐÐÐÐ Petr Ð ÐСÐÐ/1.0\r\n\r\n'93 resp = Response(raw_request)94 reg_agent_response = await ask_permission(raw_request)95 self.assertEqual(await process_permission(reg_agent_response), True)96 self.assertEqual(await resp._make_get(self.storage),97 'ÐÐÐ ÐÐÐÐЫÐС Ð ÐСÐÐ/1.0\r\n79842342143\r\n\r\n')98 raw_request = 'ÐТÐÐÐÐÐ Vitya Ivanov Ð ÐСÐÐ/1.0\r\n\r\n'99 resp = Response(raw_request)100 reg_agent_response = await ask_permission(raw_request)101 self.assertEqual(await process_permission(reg_agent_response), True)102 self.assertEqual(await resp._make_get(self.storage),103 'ÐÐÐÐШÐÐ Ð ÐСÐÐ/1.0\r\n\r\n')104 async def test_make_write(self):105 raw_request = 'ÐÐÐÐШРÐиÑÑ Ð ÐСÐÐ/1.0\r\n79846543210\r\n\r\n'106 resp = Response(raw_request)107 reg_agent_response = await ask_permission(raw_request)108 self.assertEqual(await process_permission(reg_agent_response), True)109 self.assertEqual(await resp._make_write(self.storage),110 'ÐÐÐ ÐÐÐÐЫÐС Ð ÐСÐÐ/1.0\r\n\r\n')111 self.assertEqual(await self.storage.get('ÐиÑÑ'), '79846543210')112 async def test_make_delete(self):113 raw_request = 'УÐÐÐÐ Petr Ð ÐСÐÐ/1.0\r\n\r\n'114 resp = Response(raw_request)115 reg_agent_response = await ask_permission(raw_request)116 self.assertEqual(await process_permission(reg_agent_response), True)117 self.assertEqual(await resp._make_delete(self.storage),118 'ÐÐÐ ÐÐÐÐЫÐС Ð ÐСÐÐ/1.0\r\n\r\n')119 with self.assertRaises(FileNotFoundError):120 await self.storage.get('Petr')121 raw_request = 'УÐÐÐÐ ÐикиÑа ÐиÑожков Ð ÐСÐÐ/1.0\r\n\r\n'122 resp = Response(raw_request)123 reg_agent_response = await ask_permission(raw_request)124 self.assertEqual(await process_permission(reg_agent_response), True)125 self.assertEqual(await resp._make_delete(self.storage),126 'ÐÐÐÐШÐÐ Ð ÐСÐÐ/1.0\r\n\r\n')127 def test_make_bad_request(self):128 raw_request = 'ÐÐ ÐЫÐÐ ÐÐ ÐÐÐÐÐÐÐ Ð ÐÐРЫÐÐ\r\n\r\n'129 resp = Response(raw_request)130 self.assertEqual(resp._make_bad_request(), 'ÐÐÐÐÐЯРРÐСÐÐ/1.0\r\n\r\n')131 async def test_make_response(self):132 get_request = 'ÐТÐÐÐÐÐ Petr Ð ÐСÐÐ/1.0\r\n\r\n'133 resp = Response(get_request)134 resp._make_get = AsyncMock(name='_make_get')135 resp._make_delete = AsyncMock(name='_make_delete')136 resp._make_write = AsyncMock(name='_make_write')137 resp._make_bad_request = mock.Mock(name='_make_bad_request')138 await resp.make_response(self.storage)139 write_request = 'ÐÐÐÐШРÐиÑÑ Ð ÐСÐÐ/1.0\r\n79846543210\r\n\r\n'140 resp.set_request(write_request)141 await resp.make_response(self.storage)142 del_request = 'УÐÐÐÐ Petr Ð ÐСÐÐ/1.0\r\n\r\n'143 resp.set_request(del_request)144 await resp.make_response(self.storage)145 bad_request = 'ÐÐ ÐЫÐÐ ÐÐ ÐÐÐÐÐÐÐ Ð ÐÐРЫÐÐ\r\n\r\n'146 resp.set_request(bad_request)147 await resp.make_response(self.storage)148 self.assertTrue(resp._make_get.assert_called_once)149 self.assertTrue(resp._make_write.assert_called_once)150 self.assertTrue(resp._make_delete.assert_called_once)151 self.assertTrue(resp._make_bad_request.assert_called_once)152 async def test_make_response_permission_not_granted(self):153 raw_request = 'ÐТÐÐÐÐÐ ÐÐ»Ð°Ð´Ð¸Ð¼Ð¸Ñ ÐÑÑин Ð ÐСÐÐ/1.0\r\n\r\n'154 resp = Response(raw_request)155 reg_agent_response = await ask_permission(raw_request)156 self.assertEqual(await process_permission(reg_agent_response), False)157 self.assertEqual(await resp.make_response(self.storage),158 reg_agent_response)159 raw_request = 'ÐÐÐÐШРÐÐ»Ð°Ð´Ð¸Ð¼Ð¸Ñ ÐÑÑин Ð ÐСÐÐ/1.0\r\n79846543210\r\n\r\n'160 resp = Response(raw_request)161 reg_agent_response = await ask_permission(raw_request)162 self.assertEqual(await process_permission(reg_agent_response), False)163 self.assertEqual(await resp.make_response(self.storage),164 reg_agent_response)165 with self.assertRaises(FileNotFoundError):166 await self.storage.get('ÐÐ»Ð°Ð´Ð¸Ð¼Ð¸Ñ ÐÑÑин')167 raw_request = 'УÐÐÐÐ ÐÐ»Ð°Ð´Ð¸Ð¼Ð¸Ñ ÐÑÑин Ð ÐСÐÐ/1.0\r\n\r\n'168 resp = Response(raw_request)169 reg_agent_response = await ask_permission(raw_request)170 self.assertEqual(await process_permission(reg_agent_response), False)171 self.assertEqual(await resp.make_response(self.storage),172 reg_agent_response)173 with self.assertRaises(FileNotFoundError):174 await self.storage.get('ÐÐ»Ð°Ð´Ð¸Ð¼Ð¸Ñ ÐÑÑин')175if __name__ == '__main__':...
site24x7.py
Source:site24x7.py
1import init2from logger import GCPLogger3import google.oauth2.id_token4import google.auth.transport.requests5import requests6logging_client = GCPLogger(gcp_project=init.gcp_project)7local = {8 "ticket_details" : None,9 "incident_state" : None,10 "sys_id" : None,11 "servicenow_ticket" : None12}13priority_map = {14 'TROUBLE': "3",15 'DOWN': "1",16 'UP': "4",17 'CRITICAL': "2"18}19site24x7_camel_case_map = {20 "availability": "Availability",21 "pagereads": "PageReads",22 "cpuutilization": "CPUUtilization",23 "cpuusedpercent": "CPUUsedPercent",24 "memusedpercent": "MemUsedPercent",25 "memoryutilization": "MemoryUtilization",26 "usedmemory": "UsedMemory",27 "dusedpercent": "DUsedPercent",28 "pagewrites": "PageWrites",29 "pageexpectency": "PageExpectency"30}31def getIncidentDetails () :32 raw_request = init.webhook_payload33 attributeName = "NA"34 incidentReason = raw_request['INCIDENT_REASON']35 if('STATUS_CHANGE_ATTRIBUTES' in raw_request):36 if('reason' in raw_request['STATUS_CHANGE_ATTRIBUTES'][0]):37 incidentReason = raw_request['STATUS_CHANGE_ATTRIBUTES'][0]['reason']38 if('attributeName' in raw_request['STATUS_CHANGE_ATTRIBUTES'][0]):39 attributeName = raw_request['STATUS_CHANGE_ATTRIBUTES'][0]['attributeName']40 41 if attributeName.lower() in site24x7_camel_case_map :42 attributeName = site24x7_camel_case_map[attributeName.lower()]43 failedAttributes = "NA"44 if(raw_request['STATUS'] == "DOWN"):45 if('FAILED_ATTRIBUTES' in raw_request):46 failedAttributes = str(raw_request['FAILED_ATTRIBUTES'][0])47 ticket_content = f'''Customer Name - {raw_request.get('MSP_CUSTOMER_NAME','NA')}48CI Name - {raw_request.get('MONITORNAME','NA')}49CI Type - {raw_request.get('MONITORTYPE','NA')}50Status - {raw_request.get('STATUS','NA')}51Failed Attribute - {failedAttributes}52Incident Reason - {incidentReason}53Incident Time - {raw_request.get('INCIDENT_TIME','NA')}54Tool - Site24x755'''56 # <Customer> | <Project> | <ComponentType> | <Component> | <Metric> 57 # <Customer> | <Project> | <ComponentType> | <Component> | <Metric> 58 # <Customer> | <ComponentType> | <Component> | <Metric>59 ticket_subject = raw_request.get('MSP_CUSTOMER_NAME','NA') + " | " + raw_request.get('MONITORTYPE','NA') + " | " + raw_request.get('MONITORNAME','NA') + " | " + attributeName60 61 local["ticket_details"] = {62 "ticket_subject": ticket_subject,63 "ticket_content": ticket_content,64 "servicenow_attribs": {65 "short_description": ticket_subject,66 "priority": priority_map[raw_request.get('STATUS','TROUBLE')],67 "caller_id": "Site24x7",68 "cmdb_ci": raw_request.get('MONITORNAME','NA'),69 "description": ticket_content,70 "assignment_group": 'Cloud Support',71 "contact_type": 'alert',72 "u_monitoring_tool": 'Site24x7'73 # "u_account_name": self.raw_request['MSP_CUSTOMER_NAME'],74 },75 "ticket_type": "INCIDENT"76 }77 temp_dict = {78 "tool": "site24x7",79 "source": "cloud function | incident-processing-function",80 "message": f"incident received | {ticket_subject}",81 "ticket_details": raw_request82 }83 logging_client.write_entry(logger_name=init.log_name, log_struct=temp_dict, severity="INFO")84 return 085def getIncidentState () :86 raw_request = init.webhook_payload87 auth_req = google.auth.transport.requests.Request()88 url = init.servicenow_client_url89 id_token = google.oauth2.id_token.fetch_id_token(auth_req, url)90 headers = {91 "Authorization": f"Bearer {id_token}", 92 "Content-Type": "application/json"93 }94 request_body = {95 "api_path": "api/now/table/incident",96 "request_headers": {97 "Content-Type": "application/json", "Accept": "application/json"98 },99 "query_parameters": {100 "sysparm_query": f'active=true^stateNOT IN6,7,8^short_description={local["ticket_details"]["ticket_subject"]}',101 "sysparm_limit": "1"102 },103 "request_body": {},104 "ticket_type": "INCIDENT",105 "method": "QUERY"106 }107 response = requests.post(url=url, headers=headers, json=request_body)108 if response.status_code == 200 :109 if "result" in response.json() :110 if response.json()["result"] is not None :111 if len(response.json()["result"]) != 0 :112 local["servicenow_ticket"] = response.json()["result"][0]113 local["incident_state"] = 1114 local["sys_id"] = response.json()["result"][0]["sys_id"]115 return 0116 local["incident_state"] = 0117 return 0118 else :119 temp_dict = {120 "tool": "site24x7",121 "message": "failed to get incident state",122 "source": "cloud function | incident-processing-function",123 "response_code": response.status_code,124 "response_headers": dict(response.headers),125 "response_body": response.text,126 "ticket_details": raw_request127 }128 logging_client.write_entry(logger_name=init.log_name, log_struct=temp_dict, severity="ERROR")129 return 1130def actOnIncident () :131 raw_request = init.webhook_payload132 if local["incident_state"] == 0 :133 auth_req = google.auth.transport.requests.Request()134 url = init.servicenow_client_url135 id_token = google.oauth2.id_token.fetch_id_token(auth_req, url)136 headers = {137 "Authorization": "Bearer {}".format(id_token), 138 "Content-Type": "application/json"139 }140 path_params = {141 "tableName": "incident"142 }143 api_path = "api/now/table/{tableName}".format(**path_params)144 request_body = {145 "api_path": api_path,146 "request_headers": {147 "Content-Type": "application/json", "Accept": "application/json"148 },149 "query_parameters": None,150 "request_body": local["ticket_details"]["servicenow_attribs"],151 "ticket_type": "INCIDENT",152 "method": "POST"153 }154 response = requests.post(url=url, headers=headers, json=request_body)155 if response.status_code == 200 :156 temp_dict = {157 "tool": "site24x7",158 "source": "cloud function | incident-processing-function",159 "message": f"incident posted | {local['ticket_details']['servicenow_attribs']['short_description']}",160 "ticket_details": raw_request161 }162 logging_client.write_entry(logger_name=init.log_name, log_struct=temp_dict, severity="INFO")163 return 0164 else :165 temp_dict = {166 "tool": "site24x7",167 "message": "failed to post incident",168 "source": "cloud function | incident-processing-function",169 "response_code": response.status_code,170 "response_headers": dict(response.headers),171 "response_body": response.text,172 "ticket_details": raw_request173 }174 logging_client.write_entry(logger_name=init.log_name, log_struct=temp_dict, severity="ERROR")175 return 1176 177 if local['incident_state'] == 1 :178 auth_req = google.auth.transport.requests.Request()179 url = init.servicenow_client_url180 id_token = google.oauth2.id_token.fetch_id_token(auth_req, url)181 headers = {182 "Authorization": "Bearer {}".format(id_token), 183 "Content-Type": "application/json"184 }185 path_params = {186 "tableName": "incident",187 "sys_id": local['sys_id']188 }189 api_path = "api/now/table/{tableName}/{sys_id}".format(**path_params)190 # temp_dict = self.ticket_details["servicenow_attribs"]191 temp_dict = {}192 temp_dict["work_notes"] = local['ticket_details']["ticket_content"]193 # temp_dict.pop('priority', None)194 request_body = {195 "api_path": api_path,196 "request_headers": {197 "Content-Type": "application/json", "Accept": "application/json"198 },199 "query_parameters": None,200 "request_body": temp_dict,201 "ticket_type": "INCIDENT",202 "method": "PUT"203 }204 response = requests.post(url=url, headers=headers, json=request_body)205 if response.status_code == 200 :206 temp_dict = {207 "tool": "site24x7",208 "source": "cloud function | incident-processing-function",209 "message": f"incident updated | {local['ticket_details']['servicenow_attribs']['short_description']}",210 "ticket_details": raw_request211 }212 logging_client.write_entry(logger_name=init.log_name, log_struct=temp_dict, severity="INFO")213 return 0214 else :215 temp_dict = {216 "tool": "site24x7",217 "message": "failed to update incident",218 "source": "cloud function | incident-processing-function",219 "response_code": response.status_code,220 "response_headers": dict(response.headers),221 "response_body": response.text,222 "ticket_details": raw_request223 }224 logging_client.write_entry(logger_name=init.log_name, log_struct=temp_dict, severity="ERROR")...
client.py
Source:client.py
1import json2try:3 import requests4 from requests.exceptions import RequestException5 from requests.packages.urllib3.exceptions import InsecureRequestWarning6except ImportError: # pragma: no cover7 pass8from . import exceptions, utils9from .models import (10 Epics,11 History,12 IssueAttachments,13 IssueAttributes,14 Issues,15 IssueStatuses,16 IssueTypes,17 Milestones,18 Points,19 Priorities,20 Projects,21 Roles,22 Severities,23 TaskAttachments,24 TaskAttributes,25 Tasks,26 TaskStatuses,27 Users,28 UserStories,29 UserStoryAttachments,30 UserStoryAttributes,31 UserStoryStatuses,32 Webhooks,33 WikiLinks,34 WikiPages,35)36from .requestmaker import RequestMaker37class SearchResult:38 count = 039 tasks = []40 issues = []41 user_stories = []42 wiki_pages = []43class TaigaAPI:44 """45 TaigaAPI class46 :param host: the host of your Taiga.io instance47 :param token: the token you may provide48 :param token_type: the token type49 :param tls_verify: verify server certificate50 :param auth_type: authentication type identifier51 """52 def __init__(53 self, host="https://api.taiga.io", token=None, token_type="Bearer", tls_verify=True, auth_type="normal"54 ):55 self.host = host56 self.token = token57 self.token_type = token_type58 self.tls_verify = tls_verify59 self.auth_type = auth_type60 if not self.tls_verify:61 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)62 if token:63 self.raw_request = RequestMaker("/api/v1", self.host, self.token, self.token_type, self.tls_verify)64 self._init_resources()65 def _init_resources(self):66 self.projects = Projects(self.raw_request)67 self.user_stories = UserStories(self.raw_request)68 self.user_story_attachments = UserStoryAttachments(self.raw_request)69 self.users = Users(self.raw_request)70 self.issues = Issues(self.raw_request)71 self.issue_attachments = IssueAttachments(self.raw_request)72 self.tasks = Tasks(self.raw_request)73 self.task_attachments = TaskAttachments(self.raw_request)74 self.milestones = Milestones(self.raw_request)75 self.severities = Severities(self.raw_request)76 self.roles = Roles(self.raw_request)77 self.points = Points(self.raw_request)78 self.issue_statuses = IssueStatuses(self.raw_request)79 self.issue_types = IssueTypes(self.raw_request)80 self.issue_attributes = IssueAttributes(self.raw_request)81 self.task_attributes = TaskAttributes(self.raw_request)82 self.user_story_attributes = UserStoryAttributes(self.raw_request)83 self.task_statuses = TaskStatuses(self.raw_request)84 self.priorities = Priorities(self.raw_request)85 self.user_story_statuses = UserStoryStatuses(self.raw_request)86 self.wikipages = WikiPages(self.raw_request)87 self.wikilinks = WikiLinks(self.raw_request)88 self.history = History(self.raw_request)89 self.webhooks = Webhooks(self.raw_request)90 self.epics = Epics(self.raw_request)91 def me(self):92 """93 Get a :class:`taiga.models.models.User` representing me94 """95 return self.users.get("me")96 def search(self, project, text=""):97 """98 Search in your Taiga.io instance99 :param project: the project id100 :param text: the query of your search101 """102 result = self.raw_request.get("search", query={"project": project, "text": text})103 result = result.json()104 search_result = SearchResult()105 search_result.tasks = self.tasks.parse_list(result["tasks"])106 search_result.issues = self.issues.parse_list(result["issues"])107 search_result.user_stories = self.user_stories.parse_list(result["userstories"])108 search_result.wikipages = self.wikipages.parse_list(result["wikipages"])109 search_result.epics = self.epics.parse_list(result["epics"])110 return search_result111 def auth(self, username, password):112 """113 Authenticate you114 :param username: your username115 :param password: your password116 """117 headers = {"Content-type": "application/json"}118 payload = {"type": self.auth_type, "username": username, "password": password}119 try:120 full_url = utils.urljoin(self.host, "/api/v1/auth")121 response = requests.post(full_url, data=json.dumps(payload), headers=headers, verify=self.tls_verify)122 except RequestException:123 raise exceptions.TaigaRestException(full_url, 400, "NETWORK ERROR", "POST")124 if response.status_code != 200:125 raise exceptions.TaigaRestException(full_url, response.status_code, response.text, "POST")126 self.token = response.json()["auth_token"]127 self.raw_request = RequestMaker("/api/v1", self.host, self.token, "Bearer", self.tls_verify)128 self._init_resources()129 def auth_app(self, app_id, app_secret, auth_code, state=""):130 """131 Authenticate an app132 :param app_id: the app id133 :param app_secret: the app secret134 :param auth_code: the app auth code135 """136 headers = {"Content-type": "application/json"}137 payload = {"application": app_id, "auth_code": auth_code, "state": state}138 try:139 full_url = utils.urljoin(self.host, "/api/v1/application-tokens/validate")140 response = requests.post(full_url, data=json.dumps(payload), headers=headers, verify=self.tls_verify)141 except RequestException:142 raise exceptions.TaigaRestException(full_url, 400, "NETWORK ERROR", "POST")143 if response.status_code != 200:144 raise exceptions.TaigaRestException(full_url, response.status_code, response.text, "POST")145 cyphered_token = response.json().get("cyphered_token", "")146 if cyphered_token:147 from jwkest.jwe import JWE148 from jwkest.jwk import SYMKey149 sym_key = SYMKey(key=app_secret, alg="A128KW")150 data, success = JWE().decrypt(cyphered_token, keys=[sym_key]), True151 if isinstance(data, tuple):152 data, success = data153 try:154 self.token = json.loads(data.decode("utf-8")).get("token", None)155 except ValueError: # pragma: no cover156 self.token = None157 if not success:158 self.token = None159 else:160 self.token = None161 if self.token is None:162 raise exceptions.TaigaRestException(full_url, 400, "INVALID TOKEN", "POST")163 self.raw_request = RequestMaker("/api/v1", self.host, self.token, "Application", self.tls_verify)...
Request.py
Source:Request.py
1from abc import ABCMeta, abstractmethod2from UVMPMException import InvalidRequestSyntax3from Client import Client4class Request:5 __metaclass__ = ABCMeta6 def __init__(self, client: Client, raw_request: str):7 self.client = client8 self.raw_request = raw_request9 @staticmethod10 @abstractmethod11 def is_of_type(to_match: str):12 pass13class Handshake(Request):14 def __init__(self, client: Client, raw_request: str):15 super().__init__(client, raw_request)16 @staticmethod17 def is_of_type(to_match: str):18 return to_match == "HELLO"19class Authentication(Request):20 def __init__(self, client: Client, raw_request: str):21 super().__init__(client, raw_request)22 split = self.raw_request.split(":")23 if len(split) != 3:24 raise InvalidRequestSyntax(self.raw_request)25 self.username = split[1]26 self.password = split[2]27 @staticmethod28 def is_of_type(to_match: str):29 return to_match.startswith("AUTH:")30class ListUsers(Request):31 def __init__(self, client: Client, raw_request: str):32 super().__init__(client, raw_request)33 @staticmethod34 def is_of_type(to_match: str):35 return to_match == "LIST"36class SendMessage(Request):37 def __init__(self, client: Client, raw_request: str):38 super().__init__(client, raw_request)39 split = self.raw_request.split(":")40 if len(split) != 3:41 raise InvalidRequestSyntax(self.raw_request)42 self.receiving_username = split[1]43 self.message = split[2]44 @staticmethod45 def is_of_type(to_match: str):46 return to_match.startswith("To:")47class Logout(Request):48 def __init__(self, client: Client, raw_request: str):49 super().__init__(client, raw_request)50 @staticmethod51 def is_of_type(to_match: str):52 return to_match == "BYE"53class Unknown(Request):54 def __init__(self, client: Client, raw_request: str):55 super().__init__(client, raw_request)56 @staticmethod57 def is_of_type(to_match: str):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!