Best Python code snippet using localstack_python
httpd.py
Source:httpd.py
1#2# Mailpile's built-in HTTPD3#4###############################################################################5import Cookie6import hashlib7import mimetypes8import os9import random10import socket11import SocketServer12import time13import threading14from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler15from urllib import quote, unquote16from urlparse import parse_qs, urlparse17import mailpile.util18from mailpile.commands import Action19from mailpile.i18n import gettext as _20from mailpile.i18n import ngettext as _n21from mailpile.urlmap import UrlMap22from mailpile.util import *23from mailpile.ui import *24global WORD_REGEXP, STOPLIST, BORING_HEADERS, DEFAULT_PORT25DEFAULT_PORT = 3341126BLOCK_HTTPD_LOCK = UiRLock()27LIVE_HTTP_REQUESTS = 028def Idle_HTTPD(allowed=1):29 with BLOCK_HTTPD_LOCK:30 sleep = 10031 while (sleep and32 not mailpile.ui.QUITTING and33 LIVE_HTTP_REQUESTS > allowed):34 time.sleep(0.05)35 sleep -= 136 return BLOCK_HTTPD_LOCK37class HttpRequestHandler(SimpleXMLRPCRequestHandler):38 # We always recognize these extensions, no matter what the Python39 # mimetype module thinks.40 _MIMETYPE_MAP = dict([(ext, 'text/plain') for ext in (41 'c', 'cfg', 'conf', 'cpp', 'csv', 'h', 'hpp', 'log', 'md', 'me',42 'py', 'rb', 'rc', 'txt'43 )] + [(ext, 'application/x-font') for ext in (44 'pfa', 'pfb', 'gsf', 'pcf'45 )] + [46 ('css', 'text/css'),47 ('eot', 'application/vnd.ms-fontobject'),48 ('gif', 'image/gif'),49 ('html', 'text/html'),50 ('htm', 'text/html'),51 ('ico', 'image/x-icon'),52 ('jpg', 'image/jpeg'),53 ('jpeg', 'image/jpeg'),54 ('js', 'text/javascript'),55 ('json', 'application/json'),56 ('otf', 'font/otf'),57 ('png', 'image/png'),58 ('rss', 'application/rss+xml'),59 ('tif', 'image/tiff'),60 ('tiff', 'image/tiff'),61 ('ttf', 'font/ttf'),62 ('svg', 'image/svg+xml'),63 ('svgz', 'image/svg+xml'),64 ('woff', 'application/font-woff'),65 ])66 _ERROR_CONTEXT = {'lastq': '', 'csrf': '', 'path': ''},67 def http_host(self):68 """Return the current server host, e.g. 'localhost'"""69 #rsplit removes port70 return self.headers.get('host', 'localhost').rsplit(':', 1)[0]71 def http_session(self):72 """Fetch the session ID from a cookie, or assign a new one"""73 cookies = Cookie.SimpleCookie(self.headers.get('cookie'))74 session_id = cookies.get(self.server.session_cookie)75 if session_id:76 session_id = session_id.value77 else:78 session_id = self.server.make_session_id(self)79 return session_id80 def server_url(self):81 """Return the current server URL, e.g. 'http://localhost:33411/'"""82 return '%s://%s' % (self.headers.get('x-forwarded-proto', 'http'),83 self.headers.get('host', 'localhost'))84 def send_http_response(self, code, msg):85 """Send the HTTP response header"""86 self.wfile.write('HTTP/1.1 %s %s\r\n' % (code, msg))87 def send_http_redirect(self, destination):88 self.send_http_response(302, 'Found')89 self.wfile.write(('Location: %s\r\n\r\n'90 '<h1><a href="%s">Please look here!</a></h1>\n'91 ) % (destination, destination))92 def send_standard_headers(self,93 header_list=[],94 cachectrl='private',95 mimetype='text/html'):96 """97 Send common HTTP headers plus a list of custom headers:98 - Cache-Control99 - Content-Type100 This function does not send the HTTP/1.1 header, so101 ensure self.send_http_response() was called before102 Keyword arguments:103 header_list -- A list of custom headers to send, containing104 key-value tuples105 cachectrl -- The value of the 'Cache-Control' header field106 mimetype -- The MIME type to send as 'Content-Type' value107 """108 if mimetype.startswith('text/') and ';' not in mimetype:109 mimetype += ('; charset = utf-8')110 self.send_header('Cache-Control', cachectrl)111 self.send_header('Content-Type', mimetype)112 for header in header_list:113 self.send_header(header[0], header[1])114 session_id = self.session.ui.html_variables.get('http_session')115 if session_id:116 cookies = Cookie.SimpleCookie()117 cookies[self.server.session_cookie] = session_id118 cookies[self.server.session_cookie]['path'] = '/'119 cookies[self.server.session_cookie]['max-age'] = 24 * 3600120 self.send_header(*cookies.output().split(': ', 1))121 self.send_header('Cache-Control', 'no-cache="set-cookie"')122 self.end_headers()123 def send_full_response(self, message,124 code=200, msg='OK',125 mimetype='text/html', header_list=[],126 cachectrl=None,127 suppress_body=False):128 """129 Sends the HTTP header and a response list130 message -- The body of the response to send131 header_list -- A list of custom headers to send,132 containing key-value tuples133 code -- The HTTP response code to send134 mimetype -- The MIME type to send as 'Content-Type' value135 suppress_body -- Set this to True to ignore the message parameter136 and not send any response body137 """138 message = unicode(message).encode('utf-8')139 self.log_request(code, message and len(message) or '-')140 # Send HTTP/1.1 header141 self.send_http_response(code, msg)142 # Send all headers143 if code == 401:144 self.send_header('WWW-Authenticate',145 'Basic realm = MP%d' % (time.time() / 3600))146 # If suppress_body == True, we don't know the content length147 contentLengthHeaders = []148 if not suppress_body:149 contentLengthHeaders = [('Content-Length', len(message or ''))]150 self.send_standard_headers(header_list=(header_list +151 contentLengthHeaders),152 mimetype=mimetype,153 cachectrl=(cachectrl or "no-cache"))154 # Response body155 if not suppress_body:156 self.wfile.write(message or '')157 def guess_mimetype(self, fpath):158 ext = os.path.basename(fpath).rsplit('.')[-1]159 return (self._MIMETYPE_MAP.get(ext.lower()) or160 mimetypes.guess_type(fpath, strict=False)[0] or161 'application/octet-stream')162 def _mk_etag(self, *args):163 # This ETag varies by whatever args we give it (e.g. size, mtime,164 # etc), but is unique per Mailpile instance and should leak nothing165 # about the actual server configuration.166 data = '%s-%s' % (self.server.secret, '-'.join((str(a) for a in args)))167 return hashlib.md5(data).hexdigest()168 def send_file(self, config, filename, suppress_body=False):169 # FIXME: Do we need more security checks?170 if '..' in filename:171 code, msg = 403, "Access denied"172 else:173 try:174 tpl = config.sys.path.get(self.http_host(), 'html_theme')175 fpath, fd, mt = config.open_file(tpl, filename)176 with fd:177 mimetype = mt or self.guess_mimetype(fpath)178 msg_size = os.path.getsize(fpath)179 if not suppress_body:180 message = fd.read()181 else:182 message = None183 code, msg = 200, "OK"184 except IOError, e:185 mimetype = 'text/plain'186 if e.errno == 2:187 code, msg = 404, "File not found"188 elif e.errno == 13:189 code, msg = 403, "Access denied"190 else:191 code, msg = 500, "Internal server error"192 message = None193 msg_size = 0194 # Note: We assume the actual static content almost never varies195 # on a given Mailpile instance, thuse the long TTL and no196 # ETag for conditional loads.197 self.log_request(code, msg_size if (message is not None) else '-')198 self.send_http_response(code, msg)199 self.send_standard_headers(header_list=[('Content-Length', msg_size)],200 mimetype=mimetype,201 cachectrl='must-revalidate, max-age=36000')202 self.wfile.write(message or '')203 def csrf(self):204 """205 Generate a hashed token from the current timestamp206 and the server secret to avoid CSRF attacks207 """208 ts = '%x' % int(time.time() / 60)209 return '%s-%s' % (ts, b64w(sha1b64('-'.join([self.server.secret,210 ts]))))211 def do_POST(self, method='POST'):212 (scheme, netloc, path, params, query, frag) = urlparse(self.path)213 if path.startswith('/::XMLRPC::/'):214 raise ValueError(_('XMLRPC has been disabled for now.'))215 #return SimpleXMLRPCRequestHandler.do_POST(self)216 # Update thread name for debugging purposes217 threading.current_thread().name = 'POST:%s' % self.path.split('?')[0]218 self.session, config = self.server.session, self.server.session.config219 post_data = {}220 try:221 ue = 'application/x-www-form-urlencoded'222 clength = int(self.headers.get('content-length', 0))223 ctype, pdict = cgi.parse_header(self.headers.get('content-type',224 ue))225 if ctype == 'multipart/form-data':226 post_data = cgi.FieldStorage(227 fp=self.rfile,228 headers=self.headers,229 environ={'REQUEST_METHOD': method,230 'CONTENT_TYPE': self.headers['Content-Type']}231 )232 elif ctype == ue:233 if clength > 5 * 1024 * 1024:234 raise ValueError(_('OMG, input too big'))235 post_data = cgi.parse_qs(self.rfile.read(clength), 1)236 else:237 raise ValueError(_('Unknown content-type'))238 except (IOError, ValueError), e:239 self.send_full_response(self.server.session.ui.render_page(240 config, self._ERROR_CONTEXT,241 body='POST geborked: %s' % e,242 title=_('Internal Error')243 ), code=500)244 return None245 return self.do_GET(post_data=post_data, method=method)246 def do_GET(self, *args, **kwargs):247 global LIVE_HTTP_REQUESTS248 try:249 path = self.path.split('?')[0]250 threading.current_thread().name = 'WAIT:%s' % path251 with BLOCK_HTTPD_LOCK:252 LIVE_HTTP_REQUESTS += 1253 threading.current_thread().name = 'WORK:%s' % path254 return self._real_do_GET(*args, **kwargs)255 finally:256 LIVE_HTTP_REQUESTS -= 1257 def _real_do_GET(self, post_data={}, suppress_body=False, method='GET'):258 (scheme, netloc, path, params, query, frag) = urlparse(self.path)259 query_data = parse_qs(query)260 opath = path = unquote(path)261 # HTTP is stateless, so we create a new session for each request.262 self.session, config = self.server.session, self.server.session.config263 server_session = self.server.session264 if 'httpdata' in config.sys.debug:265 self.wfile = DebugFileWrapper(sys.stderr, self.wfile)266 # Static things!267 if path == '/favicon.ico':268 path = '/static/favicon.ico'269 if path.startswith('/_/'):270 path = path[2:]271 if path.startswith('/static/'):272 return self.send_file(config, path[len('/static/'):],273 suppress_body=suppress_body)274 self.session = session = Session(config)275 session.ui = HttpUserInteraction(self, config,276 log_parent=server_session.ui)277 if 'context' in post_data:278 session.load_context(post_data['context'][0])279 elif 'context' in query_data:280 session.load_context(query_data['context'][0])281 if 'http' in config.sys.debug:282 session.ui.warning = server_session.ui.warning283 session.ui.notify = server_session.ui.notify284 session.ui.error = server_session.ui.error285 session.ui.debug = server_session.ui.debug286 session.ui.debug('%s: %s qs = %s post = %s'287 % (method, opath, query_data, post_data))288 idx = session.config.index289 if session.config.loaded_config:290 name = session.config.get_profile().get('name', 'Chelsea Manning')291 else:292 name = 'Chelsea Manning'293 session.ui.html_variables = {294 'csrf': self.csrf(),295 'http_host': self.headers.get('host', 'localhost'),296 'http_hostname': self.http_host(),297 'http_method': method,298 'http_session': self.http_session(),299 'message_count': (idx and len(idx.INDEX) or 0),300 'name': name,301 'title': 'Mailpile dummy title',302 'url_protocol': self.headers.get('x-forwarded-proto', 'http'),303 'mailpile_size': idx and len(idx.INDEX) or 0304 }305 try:306 try:307 commands = UrlMap(session).map(308 self, method, path, query_data, post_data,309 authenticate=(not mailpile.util.TESTING))310 except UsageError:311 if (not path.endswith('/') and312 not session.config.sys.debug and313 method == 'GET'):314 commands = UrlMap(session).map(self, method, path + '/',315 query_data, post_data)316 url = quote(path) + '/'317 if query:318 url += '?' + query319 return self.send_http_redirect(url)320 else:321 raise322 http_headers = []323 cachectrl = None324 if 'http' not in config.sys.debug:325 etag_data = []326 max_ages = []327 have_ed = 0328 for c in commands:329 max_ages.append(c.max_age())330 ed = c.etag_data()331 have_ed += 1 if ed else 0332 etag_data.extend(ed)333 if have_ed == len(commands):334 etag = self._mk_etag(*etag_data)335 conditional = self.headers.get('if-none-match')336 if conditional == etag:337 self.send_full_response('OK', code=304,338 msg='Unmodified')339 return None340 else:341 http_headers.append(('ETag', etag))342 max_age = min(max_ages) if max_ages else 10343 cachectrl = 'must-revalidate, max-age=%d' % max_age344 global LIVE_HTTP_REQUESTS345 hang_fix = 1 if ([1 for c in commands if c.IS_HANGING_ACTIVITY]346 ) else 0347 try:348 LIVE_HTTP_REQUESTS -= hang_fix349 results = [cmd.run() for cmd in commands]350 session.ui.display_result(results[-1])351 finally:352 LIVE_HTTP_REQUESTS += hang_fix353 except UrlRedirectException, e:354 return self.send_http_redirect(e.url)355 except SuppressHtmlOutput:356 return None357 except AccessError:358 self.send_full_response(_('Access Denied'),359 code=403, mimetype='text/plain')360 return None361 except:362 e = traceback.format_exc()363 session.ui.debug(e)364 if not session.config.sys.debug:365 e = _('Internal error')366 self.send_full_response(e, code=500, mimetype='text/plain')367 return None368 mimetype, content = session.ui.render_response(session.config)369 self.send_full_response(content,370 mimetype=mimetype,371 header_list=http_headers,372 cachectrl=cachectrl)373 def do_PUT(self):374 return self.do_POST(method='PUT')375 def do_UPDATE(self):376 return self.do_POST(method='UPDATE')377 def do_HEAD(self):378 return self.do_GET(suppress_body=True, method='HEAD')379 def log_message(self, fmt, *args):380 self.server.session.ui.notify(self.server_url() +381 ' ' + (fmt % args))382class HttpServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):383 def __init__(self, session, sspec, handler):384 SimpleXMLRPCServer.__init__(self, sspec, handler)385 self.daemon_threads = True386 self.session = session387 self.sessions = {}388 self.session_cookie = None389 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)390 self.sspec = (sspec[0] or 'localhost', self.socket.getsockname()[1])391 # This hash includes the index ofuscation master key, which means392 # it should be very strongly unguessable.393 self.secret = b64w(sha512b64(394 '-'.join([str(x) for x in [session, time.time(),395 random.randint(0, 0xfffffff),396 session.config]])))397 # Generate a new unguessable session cookie name on startup398 while not self.session_cookie:399 rn = str(random.randint(0, 0xfffffff))400 self.session_cookie = CleanText(sha512b64(self.secret, rn),401 banned=CleanText.NONALNUM402 ).clean[:8].lower()403 def make_session_id(self, request):404 """Generate an unguessable and unauthenticated new session ID."""405 session_id = None406 while session_id in self.sessions or session_id is None:407 session_id = b64w(sha1b64('%s %s %x %s' % (408 self.secret,409 request and request.headers,410 random.randint(0, 0xffffffff),411 time.time())))412 return session_id413 def finish_request(self, request, client_address):414 try:415 SimpleXMLRPCServer.finish_request(self, request, client_address)416 except socket.error:417 pass418 if mailpile.util.QUITTING:419 self.shutdown()420class HttpWorker(threading.Thread):421 def __init__(self, session, sspec):422 threading.Thread.__init__(self)423 self.httpd = HttpServer(session, sspec, HttpRequestHandler)424 self.daemon = True425 self.session = session426 def run(self):427 self.httpd.serve_forever()428 def quit(self, join=False):429 if self.httpd:430 self.httpd.shutdown()431 self.httpd.server_close()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!