How to use raw_request_data method in autotest

Best Python code snippet using autotest_python

hunter_handler.py

Source: hunter_handler.py Github

copy

Full Screen

1#!/​ usr/​bin/​env2# coding=utf-83#4# Copyright 2019 ztosec & https:/​/​sec.zto.com/​5#6# Licensed under the Apache License, Version 2.0 (the "License"); you may7# not use this file except in compliance with the License. You may obtain8# a copy of the License at9#10# http:/​/​www.apache.org/​licenses/​LICENSE-2.011#12# Unless required by applicable law or agreed to in writing, software13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the15# License for the specific language governing permissions and limitations16# under the License.17"""18author: b5mali419"""20import os21import re22import base6423import json24import sys25import tornado.httpserver26import tornado.ioloop27import tornado.iostream28import tornado.web29import tornado.curl_httpclient30try:31 from common import log32except (ModuleNotFoundError, ImportError):33 HUNTER_PATH = "{}/​../​".format(os.path.dirname(os.path.abspath(__file__)))34 sys.path.insert(0, HUNTER_PATH)35finally:36 from common import log37 from hunter_celery import scan_celery38 from networkproxy import CACERT_FILE39 from networkproxy import CERTKEY_FILE40 from networkproxy import CAKEY_FILE41 from networkproxy.socket_wrapper import wrap_socket42 from model.network_proxy import NetWorkProxyConfig, NetWorkProxyConfigService43 from networkproxy.authentication import auth_login44 from api.service.redis_service import RedisService45 from model.default_value import TaskStatus46 from common.path import HUNTER_PATH47 from common.http_util import StatusCode48 from networkproxy.proxy_handler import ProxyHandler49 from networkproxy import get_http_server50 from networkproxy import set_http_server51logger = log.get_default_logger()52class HunterHandler(ProxyHandler):53 def request_handler(self, request, user_info):54 """55 将请求发送到MQ中56 :param request: 57 :return: 58 Simple example code:59 print(request.body_arguments)60 print(request.headers)61 print(request.body)62 print(request.cookies)63 print(request.version)64 print(request.protocol)65 print(request.host_name)66 print(request.uri)67 print(request.method)68 """69 if not user_info:70 return71 task_id = user_info.current_task_id72 current_user_name = user_info.user_name73 raw_request_data = self.wrap_request(request, user_info)74 # 是否为满足条件的请求75 current_task = RedisService.get_task(task_id)76 if current_task and "hook_rule" in current_task:77 # *.xx.com78 hook_rule = str(current_task.hook_rule).replace("*", ".*")79 if not str(raw_request_data["data"]["url"]).startswith(hook_rule) and re.match(r'' + hook_rule,80 raw_request_data["data"][81 "url"], re.S) is None:82 return83 if RedisService.create_urlclassifications(task_id, raw_request_data):84 logger.info("满足正则条件,发送流量到MQ中")85 scan_celery.delay(raw_request_data["data"], task_id, current_user_name, TaskStatus.NONE)86 def wrap_request(self, request, user_info):87 """88 转换请求数据格式89 :param request: 90 :return: 91 """92 from parser.base_traffic_parser import BaseTrafficParser93 raw_request_data = dict()94 url = request.uri95 if url is None or not url.startswith("http"):96 url = request.protocol + ":/​/​" + request.host_name + request.uri97 method = request.method98 headers = request.headers._dict99 request_wraper = {"data": request.body.decode("utf-8"), "type": "hunter-proxy", "url": url, "method": method,100 "parser": BaseTrafficParser.DEAFAULT_PARSER,101 "headers": json.dumps(headers), "requestid": None}102 raw_request_data["data"] = request_wraper103 return raw_request_data104 def retrieve_credentials(self):105 """106 弹出账号密码基础认证,成功则写session107 :return: 108 """109 auth_header = self.request.headers.get('Authorization', None)110 proxy_session_id = self.get_cookie('proxy_sessionid', None)111 if auth_header is not None:112 # Basic Zm9vOmJhcg==113 auth_mode, auth_base64 = auth_header.split(' ', 1)114 assert auth_mode == 'Basic'115 auth_username, auth_password = base64.b64decode(auth_base64).decode("UTF-8").split(':', 1)116 status, user_info = auth_login(auth_username, auth_password, proxy_session_id)117 # 认证失败118 if not status:119 self.write("认证失败,请确认账号密码是否正确")120 self.set_status(401)121 self.set_header('WWW-Authenticate', 'Basic realm="hunter"')122 else:123 self.set_cookie("proxy_sessionid", user_info["proxy_sessionid"])124 # 任务状态为关闭,或者任务不存在125 if "current_task_id" not in user_info or ("current_task_id" in user_info126 and user_info["current_task_id"] != ""127 and RedisService.get_task(128 user_info.current_task_id).status != str(TaskStatus.WORKING)):129 self.write("后台无正在运行的任务,你需要重建一个新任务")130 self.set_status(400)131 self.finish()132 status = False133 return status, user_info134 else:135 self.set_status(401)136 self.set_header('WWW-Authenticate', 'Basic realm="hunter"')137 self.finish()138 return False, None139 def show_cacert_page(self):140 """141 下载CA证书142 :return: 143 """144 html_content = """145 <html><head><title>Burp Suite Professional</​title>146 <style type="text/​css">147 body { background: #dedede; font-family: Arial, sans-serif; color: #404042; -webkit-font-smoothing: antialiased; }148 #container { padding: 0 15px; margin: 10px auto; background-color: #ffffff; }149 a { word-wrap: break-word; }150 a:link, a:visited { color: #e06228; text-decoration: none; }151 a:hover, a:active { color: #404042; text-decoration: underline; }152 h1 { font-size: 1.6em; line-height: 1.2em; font-weight: normal; color: #404042; }153 h2 { font-size: 1.3em; line-height: 1.2em; padding: 0; margin: 0.8em 0 0.3em 0; font-weight: normal; color: #404042;}154 .title, .navbar { color: #ffffff; background: #70BAFE; padding: 10px 15px; margin: 0 -15px 10px -15px; overflow: hidden; }155 .title h1 { color: #ffffff; padding: 0; margin: 0; font-size: 1.8em; }156 div.navbar {position: absolute; top: 18px; right: 25px;}div.navbar ul {list-style-type: none; margin: 0; padding: 0;}157 div.navbar li {display: inline; margi-left: 20px;}158 div.navbar a {color: white; padding: 10px}159 div.navbar a:hover, div.navbar a:active {text-decoration: none; background: #404042;}160 </​style>161 </​head>162 <body>163 <div id="container">164 <div class="title"><h1>Hunter Proxy</​h1></​div>165 <div class="navbar"><ul>166 <li><a href="/​cert">CA Certificate</​a></​li>167 </​ul></​div>168 <p>Welcome to Hunter Proxy.</​p><p>&nbsp;</​p>169 </​div>170 </​body>171 </​html>172 """173 self.set_status(200)174 self.write(html_content)175 self.finish()176 def download_cacert(self):177 """178 下载证书179 :return: 180 """181 self.set_header('Content-Type', 'application/​octet-stream')182 self.set_header('Content-Disposition', 'attachment; filename=ca.crt')183 # 读取的模式需要根据实际情况进行修改184 with open(CACERT_FILE, 'rb') as f:185 while True:186 data = f.read(1)187 if not data:188 break189 self.write(data)190 self.finish()191 def handle_hunter_cacert_page(self):192 """193 处理证书页面194 :return: 195 """196 if self.request.host == "hunterca":197 if self.request.uri == "http:/​/​hunterca/​cert":198 self.download_cacert()199 else:200 self.show_cacert_page()201 return202 def handle_hunter_authentication_record(self):203 """204 处理认证,保存流量到mq205 :return: 206 """207 user_info = None208 # 只对非白名单其属于要测试站点等我 开启401认证209 if not NetWorkProxyConfigService.is_white_hosts(self.request.host):210 status, user_info = self.retrieve_credentials()211 if not status or user_info is None:212 return213 # Hook request214 self.request_handler(self.request, user_info)215 @tornado.web.asynchronous216 def get(self):217 """218 下载证书页面219 :return: 220 """221 self.handle_hunter_cacert_page()222 self.handle_hunter_authentication_record()...

Full Screen

Full Screen

event_manager.py

Source: event_manager.py Github

copy

Full Screen

1from constants import *2import threading 3class EventManager:4 def __init__(self, game, network_manager):5 self.game = game6 self.network_manager = network_manager7 self.request_queue = []8 self.response_queue = []9 self.request_handler={10 1: self.game.join_game_handler,11 4: self.game.receive_guest_handler12 }13 14 self.lock = threading.Lock()15 def push_request(self, raw_request_data):16 request = self.extract_raw_request(raw_request_data)17 if request:18 self.lock.acquire()19 self.request_queue.append(request)20 self.lock.release()21 else:22 print("Invalid event data: ", data)23 def push_response(self, response):24 self.response_queue.append(response)25 def process_request_queue(self):26 if len(self.request_queue) == 0:27 return28 self.lock.acquire()29 for request in self.request_queue:30 print("Handling request: ", request)31 handler = self.request_handler.get(request.type, lambda : 'Not register handler')32 handler(request.data)33 self.request_queue.clear()34 self.lock.release()35 def process_response_queue(self):36 if len(self.response_queue) == 0:37 return38 for response in self.response_queue:39 self.network_manager.send(response)40 self.response_queue.clear()41 def post_process_response_queue(self):42 self.process_response_queue()43 def extract_raw_request(self, raw_request_data):44 try:45 contents = raw_request_data.raw_content.strip().split("\n")46 event_type = int(contents[0])47 return Request(event_type, RequestData(contents[1:], self.network_manager.get_socket_id(raw_request_data.sock)))48 except Exception as e:49 print("Exception in extract_raw_request: ", e)...

Full Screen

Full Screen

decode_request.py

Source: decode_request.py Github

copy

Full Screen

1import json2def decode_request(req):3 raw_request_data = req.get_data()4 charset = req.mimetype_params.get('charset') or 'UTF-8'5 request_dic = json.loads(raw_request_data.decode(charset, 'replace'))...

Full Screen

Full Screen

Blogs

Check out the latest blogs from LambdaTest on this topic:

April 2020 Platform Updates: New Browser, Better Performance &#038; Much Much More!

Howdy testers! If you’re reading this article I suggest you keep a diary & a pen handy because we’ve added numerous exciting features to our cross browser testing cloud and I am about to share them with you right away!

Putting Together a Testing Team

As part of one of my consulting efforts, I worked with a mid-sized company that was looking to move toward a more agile manner of developing software. As with any shift in work style, there is some bewilderment and, for some, considerable anxiety. People are being challenged to leave their comfort zones and embrace a continuously changing, dynamic working environment. And, dare I say it, testing may be the most ‘disturbed’ of the software roles in agile development.

Webinar: Move Forward With An Effective Test Automation Strategy [Voices of Community]

The key to successful test automation is to focus on tasks that maximize the return on investment (ROI), ensuring that you are automating the right tests and automating them in the right way. This is where test automation strategies come into play.

Appium: Endgame and What&#8217;s Next? [Testμ 2022]

The automation backend architecture of Appium has undergone significant development along with the release of numerous new capabilities. With the advent of Appium, test engineers can cover mobile apps, desktop apps, Flutter apps, and more.

QA Management &#8211; Tips for leading Global teams

The events over the past few years have allowed the world to break the barriers of traditional ways of working. This has led to the emergence of a huge adoption of remote working and companies diversifying their workforce to a global reach. Even prior to this many organizations had already had operations and teams geographically dispersed.

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful