Best Python code snippet using autotest_python
hunter_handler.py
Source:hunter_handler.py
1#!/ usr/bin/env2# coding=utf-83#4# Copyright 2019 ztosec & https://sec.zto.com/5#6# Licensed under the Apache License, Version 2.0 (the "License"); you may7# not use this file except in compliance with the License. You may obtain8# a copy of the License at9#10# http://www.apache.org/licenses/LICENSE-2.011#12# Unless required by applicable law or agreed to in writing, software13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the15# License for the specific language governing permissions and limitations16# under the License.17"""18author: b5mali419"""20import os21import re22import base6423import json24import sys25import tornado.httpserver26import tornado.ioloop27import tornado.iostream28import tornado.web29import tornado.curl_httpclient30try:31 from common import log32except (ModuleNotFoundError, ImportError):33 HUNTER_PATH = "{}/../".format(os.path.dirname(os.path.abspath(__file__)))34 sys.path.insert(0, HUNTER_PATH)35finally:36 from common import log37 from hunter_celery import scan_celery38 from networkproxy import CACERT_FILE39 from networkproxy import CERTKEY_FILE40 from networkproxy import CAKEY_FILE41 from networkproxy.socket_wrapper import wrap_socket42 from model.network_proxy import NetWorkProxyConfig, NetWorkProxyConfigService43 from networkproxy.authentication import auth_login44 from api.service.redis_service import RedisService45 from model.default_value import TaskStatus46 from common.path import HUNTER_PATH47 from common.http_util import StatusCode48 from networkproxy.proxy_handler import ProxyHandler49 from networkproxy import get_http_server50 from networkproxy import set_http_server51logger = log.get_default_logger()52class HunterHandler(ProxyHandler):53 def request_handler(self, request, user_info):54 """55 å°è¯·æ±åéå°MQä¸56 :param request: 57 :return: 58 Simple example code:59 print(request.body_arguments)60 print(request.headers)61 print(request.body)62 print(request.cookies)63 print(request.version)64 print(request.protocol)65 print(request.host_name)66 print(request.uri)67 print(request.method)68 """69 if not user_info:70 return71 task_id = user_info.current_task_id72 current_user_name = user_info.user_name73 raw_request_data = self.wrap_request(request, user_info)74 # æ¯å¦ä¸ºæ»¡è¶³æ¡ä»¶ç请æ±75 current_task = RedisService.get_task(task_id)76 if current_task and "hook_rule" in current_task:77 # *.xx.com78 hook_rule = str(current_task.hook_rule).replace("*", ".*")79 if not str(raw_request_data["data"]["url"]).startswith(hook_rule) and re.match(r'' + hook_rule,80 raw_request_data["data"][81 "url"], re.S) is None:82 return83 if RedisService.create_urlclassifications(task_id, raw_request_data):84 logger.info("满足æ£åæ¡ä»¶,åéæµéå°MQä¸")85 scan_celery.delay(raw_request_data["data"], task_id, current_user_name, TaskStatus.NONE)86 def wrap_request(self, request, user_info):87 """88 转æ¢è¯·æ±æ°æ®æ ¼å¼89 :param request: 90 :return: 91 """92 from parser.base_traffic_parser import BaseTrafficParser93 raw_request_data = dict()94 url = request.uri95 if url is None or not url.startswith("http"):96 url = request.protocol + "://" + request.host_name + request.uri97 method = request.method98 headers = request.headers._dict99 request_wraper = {"data": request.body.decode("utf-8"), "type": "hunter-proxy", "url": url, "method": method,100 "parser": BaseTrafficParser.DEAFAULT_PARSER,101 "headers": json.dumps(headers), "requestid": None}102 raw_request_data["data"] = request_wraper103 return raw_request_data104 def retrieve_credentials(self):105 """106 å¼¹åºè´¦å·å¯ç åºç¡è®¤è¯ï¼æåååsession107 :return: 108 """109 auth_header = self.request.headers.get('Authorization', None)110 proxy_session_id = self.get_cookie('proxy_sessionid', None)111 if auth_header is not None:112 # Basic Zm9vOmJhcg==113 auth_mode, auth_base64 = auth_header.split(' ', 1)114 assert auth_mode == 'Basic'115 auth_username, auth_password = base64.b64decode(auth_base64).decode("UTF-8").split(':', 1)116 status, user_info = auth_login(auth_username, auth_password, proxy_session_id)117 # 认è¯å¤±è´¥118 if not status:119 self.write("认è¯å¤±è´¥,请确认账å·å¯ç æ¯å¦æ£ç¡®")120 self.set_status(401)121 self.set_header('WWW-Authenticate', 'Basic realm="hunter"')122 else:123 self.set_cookie("proxy_sessionid", user_info["proxy_sessionid"])124 # ä»»å¡ç¶æ为å
³éï¼æè
ä»»å¡ä¸åå¨125 if "current_task_id" not in user_info or ("current_task_id" in user_info126 and user_info["current_task_id"] != ""127 and RedisService.get_task(128 user_info.current_task_id).status != str(TaskStatus.WORKING)):129 self.write("åå°æ æ£å¨è¿è¡çä»»å¡,ä½ éè¦é建ä¸ä¸ªæ°ä»»å¡")130 self.set_status(400)131 self.finish()132 status = False133 return status, user_info134 else:135 self.set_status(401)136 self.set_header('WWW-Authenticate', 'Basic realm="hunter"')137 self.finish()138 return False, None139 def show_cacert_page(self):140 """141 ä¸è½½CAè¯ä¹¦142 :return: 143 """144 html_content = """145 <html><head><title>Burp Suite Professional</title>146 <style type="text/css">147 body { background: #dedede; font-family: Arial, sans-serif; color: #404042; -webkit-font-smoothing: antialiased; }148 #container { padding: 0 15px; margin: 10px auto; background-color: #ffffff; }149 a { word-wrap: break-word; }150 a:link, a:visited { color: #e06228; text-decoration: none; }151 a:hover, a:active { color: #404042; text-decoration: underline; }152 h1 { font-size: 1.6em; line-height: 1.2em; font-weight: normal; color: #404042; }153 h2 { font-size: 1.3em; line-height: 1.2em; padding: 0; margin: 0.8em 0 0.3em 0; font-weight: normal; color: #404042;}154 .title, .navbar { color: #ffffff; background: #70BAFE; padding: 10px 15px; margin: 0 -15px 10px -15px; overflow: hidden; }155 .title h1 { color: #ffffff; padding: 0; margin: 0; font-size: 1.8em; }156 div.navbar {position: absolute; top: 18px; right: 25px;}div.navbar ul {list-style-type: none; margin: 0; padding: 0;}157 div.navbar li {display: inline; margi-left: 20px;}158 div.navbar a {color: white; padding: 10px}159 div.navbar a:hover, div.navbar a:active {text-decoration: none; background: #404042;}160 </style>161 </head>162 <body>163 <div id="container">164 <div class="title"><h1>Hunter Proxy</h1></div>165 <div class="navbar"><ul>166 <li><a href="/cert">CA Certificate</a></li>167 </ul></div>168 <p>Welcome to Hunter Proxy.</p><p> </p>169 </div>170 </body>171 </html>172 """173 self.set_status(200)174 self.write(html_content)175 self.finish()176 def download_cacert(self):177 """178 ä¸è½½è¯ä¹¦179 :return: 180 """181 self.set_header('Content-Type', 'application/octet-stream')182 self.set_header('Content-Disposition', 'attachment; filename=ca.crt')183 # 读åç模å¼éè¦æ ¹æ®å®é
æ
åµè¿è¡ä¿®æ¹184 with open(CACERT_FILE, 'rb') as f:185 while True:186 data = f.read(1)187 if not data:188 break189 self.write(data)190 self.finish()191 def handle_hunter_cacert_page(self):192 """193 å¤çè¯ä¹¦é¡µé¢194 :return: 195 """196 if self.request.host == "hunterca":197 if self.request.uri == "http://hunterca/cert":198 self.download_cacert()199 else:200 self.show_cacert_page()201 return202 def handle_hunter_authentication_record(self):203 """204 å¤ç认è¯ï¼ä¿åæµéå°mq205 :return: 206 """207 user_info = None208 # åªå¯¹éç½ååå
¶å±äºè¦æµè¯ç«ç¹çæ å¼å¯401认è¯209 if not NetWorkProxyConfigService.is_white_hosts(self.request.host):210 status, user_info = self.retrieve_credentials()211 if not status or user_info is None:212 return213 # Hook request214 self.request_handler(self.request, user_info)215 @tornado.web.asynchronous216 def get(self):217 """218 ä¸è½½è¯ä¹¦é¡µé¢219 :return: 220 """221 self.handle_hunter_cacert_page()222 self.handle_hunter_authentication_record()...
event_manager.py
Source:event_manager.py
1from constants import *2import threading 3class EventManager:4 def __init__(self, game, network_manager):5 self.game = game6 self.network_manager = network_manager7 self.request_queue = []8 self.response_queue = []9 self.request_handler={10 1: self.game.join_game_handler,11 4: self.game.receive_guest_handler12 }13 14 self.lock = threading.Lock()15 def push_request(self, raw_request_data):16 request = self.extract_raw_request(raw_request_data)17 if request:18 self.lock.acquire()19 self.request_queue.append(request)20 self.lock.release()21 else:22 print("Invalid event data: ", data)23 def push_response(self, response):24 self.response_queue.append(response)25 def process_request_queue(self):26 if len(self.request_queue) == 0:27 return28 self.lock.acquire()29 for request in self.request_queue:30 print("Handling request: ", request)31 handler = self.request_handler.get(request.type, lambda : 'Not register handler')32 handler(request.data)33 self.request_queue.clear()34 self.lock.release()35 def process_response_queue(self):36 if len(self.response_queue) == 0:37 return38 for response in self.response_queue:39 self.network_manager.send(response)40 self.response_queue.clear()41 def post_process_response_queue(self):42 self.process_response_queue()43 def extract_raw_request(self, raw_request_data):44 try:45 contents = raw_request_data.raw_content.strip().split("\n")46 event_type = int(contents[0])47 return Request(event_type, RequestData(contents[1:], self.network_manager.get_socket_id(raw_request_data.sock)))48 except Exception as e:49 print("Exception in extract_raw_request: ", e)...
decode_request.py
Source:decode_request.py
1import json2def decode_request(req):3 raw_request_data = req.get_data()4 charset = req.mimetype_params.get('charset') or 'UTF-8'5 request_dic = json.loads(raw_request_data.decode(charset, 'replace'))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!