How to use create_endpoint method in localstack

Best Python code snippet using localstack_python

gui.py

Source:gui.py Github

copy

Full Screen

1#!/usr/bin/python32#-*- encoding: Utf-8 -*-3from PyQt5.QtWidgets import QApplication, QListWidgetItem, QDesktopWidget, QFileDialog, QInputDialog, QProgressDialog, QMessageBox, QFileSystemModel, QHeaderView4from PyQt5.QtCore import Qt, QUrl, pyqtSignal, QThread5from PyQt5.QtGui import QDesktopServices, QTextOption6from PyQt5.uic import loadUi7from signal import signal, SIGINT, SIG_DFL8from os.path import dirname, realpath9from collections import defaultdict10from urllib.parse import urlparse11from os import listdir, remove12from functools import partial13from json import load, dump14from binascii import crc3215from pathlib import Path16from sys import argv17from utils.common import extractors, transports, BASE_PATH, assert_installed, extractor_save, insert_endpoint, load_proto_msgs18from views.fuzzer import ProtobufItem, ProtocolDataItem19from utils.transports import *20from extractors import *21"""22 This script runs the main code for the PBTK GUI, and essentially23 links loaded QtDesigner *.uis within themselves using signals, and24 with the other parts of code composing PBTK. The order of methods25 more or less follows the action flow of a graphical user.26"""27class PBTKGUI(QApplication):28 def __init__(self):29 super().__init__(argv)30 signal(SIGINT, SIG_DFL)31 32 views = dirname(realpath(__file__)) + '/views/'33 34 self.welcome = loadUi(views + 'welcome.ui')35 self.choose_extractor = loadUi(views + 'choose_extractor.ui')36 self.choose_proto = loadUi(views + 'choose_proto.ui')37 self.create_endpoint = loadUi(views + 'create_endpoint.ui')38 self.choose_endpoint = loadUi(views + 'choose_endpoint.ui')39 self.fuzzer = loadUi(views + 'fuzzer.ui')40 self.welcome.step1.clicked.connect(self.load_extractors)41 self.choose_extractor.rejected.connect(partial(self.set_view, self.welcome))42 self.choose_extractor.extractors.itemClicked.connect(self.prompt_extractor)43 44 self.welcome.step2.clicked.connect(self.load_protos)45 self.proto_fs = QFileSystemModel()46 self.choose_proto.protos.setModel(self.proto_fs)47 self.proto_fs.directoryLoaded.connect(self.choose_proto.protos.expandAll)48 49 for i in range(1, self.proto_fs.columnCount()):50 self.choose_proto.protos.hideColumn(i)51 self.choose_proto.protos.setRootIndex(self.proto_fs.index(str(BASE_PATH / 'protos')))52 self.choose_proto.rejected.connect(partial(self.set_view, self.welcome))53 self.choose_proto.protos.clicked.connect(self.new_endpoint)54 55 self.create_endpoint.transports.itemClicked.connect(self.pick_transport)56 self.create_endpoint.loadRespPbBtn.clicked.connect(self.load_another_pb)57 self.create_endpoint.rejected.connect(partial(self.set_view, self.choose_proto))58 self.create_endpoint.buttonBox.accepted.connect(self.write_endpoint)59 60 self.welcome.step3.clicked.connect(self.load_endpoints)61 self.choose_endpoint.rejected.connect(partial(self.set_view, self.welcome))62 self.choose_endpoint.endpoints.itemClicked.connect(self.launch_fuzzer)63 64 self.fuzzer.rejected.connect(partial(self.set_view, self.choose_endpoint))65 self.fuzzer.fuzzFields.clicked.connect(self.fuzz_endpoint)66 self.fuzzer.deleteThis.clicked.connect(self.delete_endpoint)67 self.fuzzer.comboBox.activated.connect(self.launch_fuzzer)68 self.fuzzer.getAdd.clicked.connect(self.add_tab_data)69 self.fuzzer.urlField.setWordWrapMode(QTextOption.WrapAnywhere)70 71 for tree in (self.fuzzer.pbTree, self.fuzzer.getTree):72 tree.itemEntered.connect(lambda item, _: item.edit() if hasattr(item, 'edit') else None)73 tree.itemClicked.connect(lambda item, col: item.update_check(col=col))74 tree.itemExpanded.connect(lambda item: item.expanded() if hasattr(item, 'expanded') else None)75 tree.header().setSectionResizeMode(QHeaderView.ResizeToContents)76 77 self.welcome.mydirLabel.setText(self.welcome.mydirLabel.text() % BASE_PATH)78 self.welcome.mydirBtn.clicked.connect(partial(QDesktopServices.openUrl, QUrl.fromLocalFile(str(BASE_PATH))))79 80 self.set_view(self.welcome)81 self.exec_()82 83 """84 Step 1 - Extract .proto structures from apps85 """86 87 def load_extractors(self):88 self.choose_extractor.extractors.clear()89 90 for name, meta in extractors.items():91 item = QListWidgetItem(meta['desc'], self.choose_extractor.extractors)92 item.setData(Qt.UserRole, name)93 94 self.set_view(self.choose_extractor)95 96 def prompt_extractor(self, item):97 extractor = extractors[item.data(Qt.UserRole)]98 inputs = []99 if not assert_installed(self.view, **extractor.get('depends', {})):100 return101 102 if not extractor.get('pick_url', False):103 files, mime = QFileDialog.getOpenFileNames()104 for path in files:105 inputs.append((path, Path(path).stem))106 else:107 text, good = QInputDialog.getText(self.view, ' ', 'Input an URL:')108 if text:109 url = urlparse(text)110 inputs.append((url.geturl(), url.netloc))111 112 if inputs:113 wait = QProgressDialog('Extracting .proto structures...', None, 0, 0)114 wait.setWindowTitle(' ')115 self.set_view(wait)116 117 self.worker = Worker(inputs, extractor)118 self.worker.progress.connect(self.extraction_progress)119 self.worker.finished.connect(self.extraction_done)120 self.worker.start()121 122 def extraction_progress(self, info, progress):123 self.view.setLabelText(info)124 125 if progress is not None:126 self.view.setRange(0, 100)127 self.view.setValue(progress * 100)128 else:129 self.view.setRange(0, 0)130 131 def extraction_done(self, outputs):132 nb_written_all, wrote_endpoints = 0, False133 134 for folder, output in outputs.items():135 nb_written, wrote_endpoints = extractor_save(BASE_PATH, folder, output)136 nb_written_all += nb_written137 138 if wrote_endpoints:139 self.set_view(self.welcome)140 QMessageBox.information(self.view, ' ', '%d endpoints and their <i>.proto</i> structures have been extracted! You can now reuse the <i>.proto</i>s or fuzz the endpoints.' % nb_written_all)141 142 elif nb_written_all:143 self.set_view(self.welcome)144 QMessageBox.information(self.view, ' ', '%d <i>.proto</i> structures have been extracted! You can now reuse the <i>.protos</i> or define endpoints for them to fuzz.' % nb_written_all)145 146 else:147 self.set_view(self.choose_extractor)148 QMessageBox.warning(self.view, ' ', 'This extractor did not find Protobuf structures in the corresponding format for specified files.')149 150 """151 Step 2 - Link .protos to endpoints152 """153 154 # Don't load .protos from the filesystem until asked to, in order155 # not to slow down startup.156 157 def load_protos(self):158 self.proto_fs.setRootPath(str(BASE_PATH / 'protos'))159 self.set_view(self.choose_proto)160 161 def new_endpoint(self, path):162 if not self.proto_fs.isDir(path):163 path = self.proto_fs.filePath(path)164 165 if not getattr(self, 'only_resp_combo', False):166 self.create_endpoint.pbRequestCombo.clear()167 self.create_endpoint.pbRespCombo.clear()168 169 has_msgs = False170 for name, cls in load_proto_msgs(path):171 has_msgs = True172 if not getattr(self, 'only_resp_combo', False):173 self.create_endpoint.pbRequestCombo.addItem(name, (path, name))174 self.create_endpoint.pbRespCombo.addItem(name, (path, name))175 if not has_msgs:176 QMessageBox.warning(self.view, ' ', 'There is no message defined in this .proto.')177 return178 179 self.create_endpoint.reqDataSubform.hide()180 if not getattr(self, 'only_resp_combo', False):181 self.create_endpoint.endpointUrl.clear()182 self.create_endpoint.transports.clear()183 self.create_endpoint.sampleData.clear()184 self.create_endpoint.pbParamKey.clear()185 self.create_endpoint.parsePbCheckbox.setChecked(False)186 187 for name, meta in transports.items():188 item = QListWidgetItem(meta['desc'], self.create_endpoint.transports)189 item.setData(Qt.UserRole, (name, meta.get('ui_data_form')))190 191 elif getattr(self, 'saved_transport_choice'):192 self.create_endpoint.transports.setCurrentItem(self.saved_transport_choice)193 self.pick_transport(self.saved_transport_choice)194 self.saved_transport_choice = None195 196 self.only_resp_combo = False197 self.set_view(self.create_endpoint)198 199 def pick_transport(self, item):200 name, desc = item.data(Qt.UserRole)201 self.has_pb_param = desc and 'regular' in desc202 self.create_endpoint.reqDataSubform.show()203 if self.has_pb_param:204 self.create_endpoint.pbParamSubform.show()205 else:206 self.create_endpoint.pbParamSubform.hide()207 self.create_endpoint.sampleDataLabel.setText('Sample request data, one per line (in the form of %s):' % desc)208 209 def load_another_pb(self):210 self.only_resp_combo = True211 self.saved_transport_choice = self.create_endpoint.transports.currentItem()212 self.set_view(self.choose_proto)213 214 def write_endpoint(self):215 request_pb = self.create_endpoint.pbRequestCombo.itemData(self.create_endpoint.pbRequestCombo.currentIndex())216 url = self.create_endpoint.endpointUrl.text()217 transport = self.create_endpoint.transports.currentItem()218 sample_data = self.create_endpoint.sampleData.toPlainText()219 pb_param = self.create_endpoint.pbParamKey.text()220 has_resp_pb = self.create_endpoint.parsePbCheckbox.isChecked()221 resp_pb = self.create_endpoint.pbRespCombo.itemData(self.create_endpoint.pbRespCombo.currentIndex())222 223 if not (request_pb and urlparse(url).netloc and transport and (not self.has_pb_param or pb_param) and (not has_resp_pb or resp_pb)):224 QMessageBox.warning(self.view, ' ', 'Please fill all relevant information fields.')225 226 else:227 json = {228 'request': {229 'transport': transport.data(Qt.UserRole)[0],230 'proto_path': request_pb[0].replace(str(BASE_PATH / 'protos'), '').strip('/\\'),231 'proto_msg': request_pb[1],232 'url': url233 }234 }235 if self.has_pb_param:236 json['request']['pb_param'] = pb_param237 238 sample_data = list(filter(None, sample_data.split('\n')))239 if sample_data:240 transport_obj = transports[transport.data(Qt.UserRole)[0]]241 transport_obj = transport_obj['func'](pb_param, url)242 243 for sample_id, sample in enumerate(sample_data):244 try:245 sample = transport_obj.serialize_sample(sample)246 except Exception:247 return QMessageBox.warning(self.view, ' ', 'Some of your sample data is not in the specified format.')248 if not sample:249 return QMessageBox.warning(self.view, ' ', "Some of your sample data didn't contain the Protobuf parameter key you specified.")250 sample_data[sample_id] = sample251 252 json['request']['samples'] = sample_data253 254 if has_resp_pb:255 json['response'] = {256 'format': 'raw_pb',257 'proto_path': resp_pb[0].replace(str(BASE_PATH / 'protos'), '').strip('/\\'),258 'proto_msg': resp_pb[1]259 }260 insert_endpoint(BASE_PATH / 'endpoints', json)261 262 QMessageBox.information(self.view, ' ', 'Endpoint created successfully.')263 self.set_view(self.welcome)264 265 """266 Step 3: Fuzz and test endpoints live267 """268 269 def load_endpoints(self):270 self.choose_endpoint.endpoints.clear()271 272 for name in listdir(str(BASE_PATH / 'endpoints')):273 if name.endswith('.json'):274 item = QListWidgetItem(name.split('.json')[0], self.choose_endpoint.endpoints)275 item.setFlags(item.flags() & ~Qt.ItemIsEnabled)276 277 pb_msg_to_endpoints = defaultdict(list)278 with open(str(BASE_PATH / 'endpoints' / name)) as fd:279 for endpoint in load(fd, object_pairs_hook=OrderedDict):280 pb_msg_to_endpoints[endpoint['request']['proto_msg'].split('.')[-1]].append(endpoint)281 282 for pb_msg, endpoints in pb_msg_to_endpoints.items():283 item = QListWidgetItem(' ' * 4 + pb_msg, self.choose_endpoint.endpoints)284 item.setFlags(item.flags() & ~Qt.ItemIsEnabled)285 286 for endpoint in endpoints:287 path_and_qs = '/' + endpoint['request']['url'].split('/', 3).pop()288 item = QListWidgetItem(' ' * 8 + path_and_qs, self.choose_endpoint.endpoints)289 item.setData(Qt.UserRole, endpoint)290 291 self.set_view(self.choose_endpoint)292 293 def launch_fuzzer(self, item):294 if type(item) == int:295 data, sample_id = self.fuzzer.comboBox.itemData(item)296 else:297 data, sample_id = item.data(Qt.UserRole), 0298 299 if data:300 self.current_req_proto = BASE_PATH / 'protos' / data['request']['proto_path']301 302 self.pb_request = load_proto_msgs(self.current_req_proto)303 self.pb_request = dict(self.pb_request)[data['request']['proto_msg']]()304 305 if data.get('response') and data['response']['format'] == 'raw_pb':306 self.pb_resp = load_proto_msgs(BASE_PATH / 'protos' / data['response']['proto_path'])307 self.pb_resp = dict(self.pb_resp)[data['response']['proto_msg']]308 self.pb_param = data['request'].get('pb_param')309 self.base_url = data['request']['url']310 self.endpoint = data311 312 self.transport_meta = transports[data['request']['transport']]313 self.transport = self.transport_meta['func'](self.pb_param, self.base_url)314 315 sample = ''316 if data['request'].get('samples'):317 sample = data['request']['samples'][sample_id]318 self.get_params = self.transport.load_sample(sample, self.pb_request)319 320 # Get initial data into the Protobuf tree view321 self.fuzzer.pbTree.clear()322 323 self.ds_items = defaultdict(dict)324 self.ds_full_names = {}325 326 for ds in self.pb_request.DESCRIPTOR.fields:327 ProtobufItem(self.fuzzer.pbTree, ds, self, [ds.full_name])328 self.parse_fields(self.pb_request)329 330 # Do the same for transport-specific data331 self.fuzzer.getTree.clear()332 self.fuzzer.tabs.setTabText(1, self.transport_meta.get('ui_tab', ''))333 if self.get_params:334 for key, val in self.get_params.items():335 ProtocolDataItem(self.fuzzer.getTree, key, val, self)336 337 # Fill the request samples combo box if we're loading a new338 # endpoint.339 if type(item) != int:340 if len(data['request'].get('samples', [])) > 1:341 self.fuzzer.comboBox.clear()342 for sample_id, sample in enumerate(data['request']['samples']):343 self.fuzzer.comboBox.addItem(sample[self.pb_param] if self.pb_param else str(sample), (data, sample_id))344 self.fuzzer.comboBoxLabel.show()345 self.fuzzer.comboBox.show()346 else:347 self.fuzzer.comboBoxLabel.hide()348 self.fuzzer.comboBox.hide()349 350 self.set_view(self.fuzzer)351 352 self.fuzzer.frame.setUrl(QUrl("about:blank"))353 self.update_fuzzer()354 """355 Parsing and rendering the Protobuf message to a tree view:356 Every Protobuf field is fed to ProtobufItem (a class inheriting357 QTreeWidgetItem), and the created object is saved in the ds_items358 entry for the corresponding descriptor.359 """360 361 def parse_fields(self, msg, base_path=[]):362 for ds, val in msg.ListFields():363 path = base_path + [ds.full_name]364 365 if ds.label == ds.LABEL_REPEATED:366 for val_index, val_value in enumerate(val):367 if ds.cpp_type == ds.CPPTYPE_MESSAGE:368 self.ds_items[id(ds)][tuple(path)].setExpanded(True)369 self.ds_items[id(ds)][tuple(path)].setDefault(parent=msg, msg=val, index=val_index)370 self.parse_fields(val_value, path)371 372 else:373 self.ds_items[id(ds)][tuple(path)].setDefault(val_value, parent=msg, msg=val, index=val_index)374 375 self.ds_items[id(ds)][tuple(path)].duplicate(True)376 377 else:378 if ds.cpp_type == ds.CPPTYPE_MESSAGE:379 self.ds_items[id(ds)][tuple(path)].setExpanded(True)380 self.ds_items[id(ds)][tuple(path)].setDefault(parent=msg, msg=val)381 self.parse_fields(val, path)382 383 else:384 self.ds_items[id(ds)][tuple(path)].setDefault(val, parent=msg, msg=val)385 386 def update_fuzzer(self):387 resp = self.transport.perform_request(self.pb_request, self.get_params)388 389 data, text, url, mime = resp.content, resp.text, resp.url, resp.headers['Content-Type'].split(';')[0]390 391 meta = '%s %d %08x\n%s' % (mime, len(data), crc32(data) & 0xffffffff, resp.url)392 self.fuzzer.urlField.setText(meta)393 394 self.fuzzer.frame.update_frame(data, text, url, mime, getattr(self, 'pb_resp', None))395 396 def fuzz_endpoint(self):397 QMessageBox.information(self.view, ' ', 'Automatic fuzzing is not implemented yet.')398 399 def delete_endpoint(self):400 if QMessageBox.question(self.view, ' ', 'Delete this endpoint?') == QMessageBox.Yes:401 path = str(BASE_PATH / 'endpoints' / (urlparse(self.base_url).netloc + '.json'))402 403 with open(path) as fd:404 json = load(fd, object_pairs_hook=OrderedDict)405 json.remove(self.endpoint)406 407 with open(path, 'w') as fd:408 dump(json, fd, ensure_ascii=False, indent=4)409 if not json:410 remove(path)411 412 self.load_endpoints()413 414 def add_tab_data(self):415 text, good = QInputDialog.getText(self.view, ' ', 'Field name:')416 if text:417 ProtocolDataItem(self.fuzzer.getTree, text, '', self).edit()418 419 """420 Utility methods follow421 """422 423 def set_view(self, view):424 if hasattr(self, 'view'):425 self.view.hide()426 view.show()427 self.view = view428 429 resolution = QDesktopWidget().screenGeometry()430 view.move((resolution.width() / 2) - (view.frameSize().width() / 2),431 (resolution.height() / 2) - (view.frameSize().height() / 2))432"""433 Simple wrapper for running extractors in background.434"""435class Worker(QThread):436 finished = pyqtSignal(object)437 progress = pyqtSignal(object, object)438 def __init__(self, inputs, extractor):439 super().__init__()440 self.inputs = inputs441 self.extractor = extractor442 443 def run(self):444 output = defaultdict(list)445 for input_, folder in self.inputs:446 # Extractor is runned here447 for name, contents in self.extractor['func'](input_):448 if name == '_progress':449 self.progress.emit(*contents)450 else:451 output[folder].append((name, contents))452 453 self.finished.emit(output)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful