Best Python code snippet using devtools-proxy_python
main.py
Source:main.py
1# Copyright 2012 the V8 project authors. All rights reserved.2# Redistribution and use in source and binary forms, with or without3# modification, are permitted provided that the following conditions are4# met:5#6# * Redistributions of source code must retain the above copyright7# notice, this list of conditions and the following disclaimer.8# * Redistributions in binary form must reproduce the above9# copyright notice, this list of conditions and the following10# disclaimer in the documentation and/or other materials provided11# with the distribution.12# * Neither the name of Google Inc. nor the names of its13# contributors may be used to endorse or promote products derived14# from this software without specific prior written permission.15#16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.27import multiprocessing28import os29import shutil30import subprocess31import threading32import time33from . import daemon34from . import local_handler35from . import presence_handler36from . import signatures37from . import status_handler38from . import work_handler39from ..network import perfdata40class Server(daemon.Daemon):41 def __init__(self, pidfile, root, stdin="/dev/null",42 stdout="/dev/null", stderr="/dev/null"):43 super(Server, self).__init__(pidfile, stdin, stdout, stderr)44 self.root = root45 self.local_handler = None46 self.local_handler_thread = None47 self.work_handler = None48 self.work_handler_thread = None49 self.status_handler = None50 self.status_handler_thread = None51 self.presence_daemon = None52 self.presence_daemon_thread = None53 self.peers = []54 self.jobs = multiprocessing.cpu_count()55 self.peer_list_lock = threading.Lock()56 self.perf_data_lock = None57 self.presence_daemon_lock = None58 self.datadir = os.path.join(self.root, "data")59 pubkey_fingerprint_filename = os.path.join(self.datadir, "mypubkey")60 with open(pubkey_fingerprint_filename) as f:61 self.pubkey_fingerprint = f.read().strip()62 self.relative_perf_filename = os.path.join(self.datadir, "myperf")63 if os.path.exists(self.relative_perf_filename):64 with open(self.relative_perf_filename) as f:65 try:66 self.relative_perf = float(f.read())67 except:68 self.relative_perf = 1.069 else:70 self.relative_perf = 1.071 def run(self):72 os.nice(20)73 self.ip = presence_handler.GetOwnIP()74 self.perf_data_manager = perfdata.PerfDataManager(self.datadir)75 self.perf_data_lock = threading.Lock()76 self.local_handler = local_handler.LocalSocketServer(self)77 self.local_handler_thread = threading.Thread(78 target=self.local_handler.serve_forever)79 self.local_handler_thread.start()80 self.work_handler = work_handler.WorkSocketServer(self)81 self.work_handler_thread = threading.Thread(82 target=self.work_handler.serve_forever)83 self.work_handler_thread.start()84 self.status_handler = status_handler.StatusSocketServer(self)85 self.status_handler_thread = threading.Thread(86 target=self.status_handler.serve_forever)87 self.status_handler_thread.start()88 self.presence_daemon = presence_handler.PresenceDaemon(self)89 self.presence_daemon_thread = threading.Thread(90 target=self.presence_daemon.serve_forever)91 self.presence_daemon_thread.start()92 self.presence_daemon.FindPeers()93 time.sleep(0.5) # Give those peers some time to reply.94 with self.peer_list_lock:95 for p in self.peers:96 if p.address == self.ip: continue97 status_handler.RequestTrustedPubkeys(p, self)98 while True:99 try:100 self.PeriodicTasks()101 time.sleep(60)102 except Exception, e:103 print("MAIN LOOP EXCEPTION: %s" % e)104 self.Shutdown()105 break106 except KeyboardInterrupt:107 self.Shutdown()108 break109 def Shutdown(self):110 with open(self.relative_perf_filename, "w") as f:111 f.write("%s" % self.relative_perf)112 self.presence_daemon.shutdown()113 self.presence_daemon.server_close()114 self.local_handler.shutdown()115 self.local_handler.server_close()116 self.work_handler.shutdown()117 self.work_handler.server_close()118 self.status_handler.shutdown()119 self.status_handler.server_close()120 def PeriodicTasks(self):121 # If we know peers we don't trust, see if someone else trusts them.122 with self.peer_list_lock:123 for p in self.peers:124 if p.trusted: continue125 if self.IsTrusted(p.pubkey):126 p.trusted = True127 status_handler.ITrustYouNow(p)128 continue129 for p2 in self.peers:130 if not p2.trusted: continue131 status_handler.TryTransitiveTrust(p2, p.pubkey, self)132 # TODO: Ping for more peers waiting to be discovered.133 # TODO: Update the checkout (if currently idle).134 def AddPeer(self, peer):135 with self.peer_list_lock:136 for p in self.peers:137 if p.address == peer.address:138 return139 self.peers.append(peer)140 if peer.trusted:141 status_handler.ITrustYouNow(peer)142 def DeletePeer(self, peer_address):143 with self.peer_list_lock:144 for i in xrange(len(self.peers)):145 if self.peers[i].address == peer_address:146 del self.peers[i]147 return148 def MarkPeerAsTrusting(self, peer_address):149 with self.peer_list_lock:150 for p in self.peers:151 if p.address == peer_address:152 p.trusting_me = True153 break154 def UpdatePeerPerformance(self, peer_address, performance):155 with self.peer_list_lock:156 for p in self.peers:157 if p.address == peer_address:158 p.relative_performance = performance159 def CopyToTrusted(self, pubkey_filename):160 with open(pubkey_filename, "r") as f:161 lines = f.readlines()162 fingerprint = lines[-1].strip()163 target_filename = self._PubkeyFilename(fingerprint)164 shutil.copy(pubkey_filename, target_filename)165 with self.peer_list_lock:166 for peer in self.peers:167 if peer.address == self.ip: continue168 if peer.pubkey == fingerprint:169 status_handler.ITrustYouNow(peer)170 else:171 result = self.SignTrusted(fingerprint)172 status_handler.NotifyNewTrusted(peer, result)173 return fingerprint174 def _PubkeyFilename(self, pubkey_fingerprint):175 return os.path.join(self.root, "trusted", "%s.pem" % pubkey_fingerprint)176 def IsTrusted(self, pubkey_fingerprint):177 return os.path.exists(self._PubkeyFilename(pubkey_fingerprint))178 def ListTrusted(self):179 path = os.path.join(self.root, "trusted")180 if not os.path.exists(path): return []181 return [ f[:-4] for f in os.listdir(path) if f.endswith(".pem") ]182 def SignTrusted(self, pubkey_fingerprint):183 if not self.IsTrusted(pubkey_fingerprint):184 return []185 filename = self._PubkeyFilename(pubkey_fingerprint)186 result = signatures.ReadFileAndSignature(filename) # Format: [key, sig].187 return [pubkey_fingerprint, result[0], result[1], self.pubkey_fingerprint]188 def AcceptNewTrusted(self, data):189 # The format of |data| matches the return value of |SignTrusted()|.190 if not data: return191 fingerprint = data[0]192 pubkey = data[1]193 signature = data[2]194 signer = data[3]195 if not self.IsTrusted(signer):196 return197 if self.IsTrusted(fingerprint):198 return # Already trust this guy.199 filename = self._PubkeyFilename(fingerprint)200 signer_pubkeyfile = self._PubkeyFilename(signer)201 if not signatures.VerifySignature(filename, pubkey, signature,202 signer_pubkeyfile):203 return204 return # Nothing more to do.205 def AddPerfData(self, test_key, duration, arch, mode):206 data_store = self.perf_data_manager.GetStore(arch, mode)207 data_store.RawUpdatePerfData(str(test_key), duration)208 def CompareOwnPerf(self, test, arch, mode):209 data_store = self.perf_data_manager.GetStore(arch, mode)210 observed = data_store.FetchPerfData(test)211 if not observed: return212 own_perf_estimate = observed / test.duration213 with self.perf_data_lock:214 kLearnRateLimiter = 9999215 self.relative_perf *= kLearnRateLimiter216 self.relative_perf += own_perf_estimate...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!