Best Python code snippet using autotest_python
host_scheduler.py
Source:host_scheduler.py
...157 host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])158 def __init__(self):159 self.host_query_manager = query_managers.AFEHostQueryManager()160 @_timer.decorate161 def _release_hosts(self):162 """Release hosts to the RDB.163 Release all hosts that are ready and are currently not being used by an164 active hqe, and don't have a new special task scheduled against them.165 @return a list of hosts that are released.166 """167 release_hosts = self.host_query_manager.find_unused_healty_hosts()168 release_hostnames = [host.hostname for host in release_hosts]169 if release_hostnames:170 self.host_query_manager.set_leased(171 False, hostname__in=release_hostnames)172 return release_hosts173 @classmethod174 def schedule_host_job(cls, host, queue_entry):175 """Schedule a job on a host.176 Scheduling a job involves:177 1. Setting the active bit on the queue_entry.178 2. Scheduling a special task on behalf of the queue_entry.179 Performing these actions will lead the job scheduler through a chain of180 events, culminating in running the test and collecting results from181 the host.182 @param host: The host against which to schedule the job.183 @param queue_entry: The queue_entry to schedule.184 """185 if queue_entry.host_id is None:186 queue_entry.set_host(host)187 elif host.id != queue_entry.host_id:188 raise rdb_utils.RDBException('The rdb returned host: %s '189 'but the job:%s was already assigned a host: %s. ' %190 (host.hostname, queue_entry.job_id,191 queue_entry.host.hostname))192 queue_entry.update_field('active', True)193 # TODO: crbug.com/373936. The host scheduler should only be assigning194 # jobs to hosts, but the criterion we use to release hosts depends195 # on it not being used by an active hqe. Since we're activating the196 # hqe here, we also need to schedule its first prejob task. OTOH,197 # we could converge to having the host scheduler manager all special198 # tasks, since their only use today is to verify/cleanup/reset a host.199 logging.info('Scheduling pre job tasks for entry: %s', queue_entry)200 queue_entry.schedule_pre_job_tasks()201 def acquire_hosts(self, host_jobs):202 """Accquire hosts for given jobs.203 This method sends jobs that need hosts to rdb.204 Child class can override this method to pipe more args205 to rdb.206 @param host_jobs: A list of queue entries that either require hosts,207 or require host assignment validation through the rdb.208 @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper209 for each host acquired on behalf of a queue_entry,210 or None if a host wasn't found.211 """212 return rdb_lib.acquire_hosts(host_jobs)213 def find_hosts_for_jobs(self, host_jobs):214 """Find and verify hosts for a list of jobs.215 @param host_jobs: A list of queue entries that either require hosts,216 or require host assignment validation through the rdb.217 @return: A list of tuples of the form (host, queue_entry) for each218 valid host-queue_entry assignment.219 """220 jobs_with_hosts = []221 hosts = self.acquire_hosts(host_jobs)222 for host, job in zip(hosts, host_jobs):223 if host:224 jobs_with_hosts.append(self.host_assignment(host, job))225 return jobs_with_hosts226 @_timer.decorate227 def tick(self):228 """Schedule core host management activities."""229 self._release_hosts()230class HostScheduler(BaseHostScheduler):231 """A scheduler capable managing host acquisition for new jobs."""232 _timer = autotest_stats.Timer('host_scheduler')233 def __init__(self):234 super(HostScheduler, self).__init__()235 self.job_query_manager = query_managers.AFEJobQueryManager()236 # Keeping track on how many hosts each suite is holding237 # {suite_job_id: num_hosts}238 self._suite_recorder = SuiteRecorder(self.job_query_manager)239 def _record_host_assignment(self, host, queue_entry):240 """Record that |host| is assigned to |queue_entry|.241 Record:242 1. How long it takes to assign a host to a job in metadata db.243 2. Record host assignment of a suite.244 @param host: A Host object.245 @param queue_entry: A HostQueueEntry object.246 """247 secs_in_queued = (datetime.datetime.now() -248 queue_entry.job.created_on).total_seconds()249 job_overhead.record_state_duration(250 queue_entry.job_id, host.hostname,251 job_overhead.STATUS.QUEUED, secs_in_queued)252 self._suite_recorder.record_assignment(queue_entry)253 @_timer.decorate254 def _schedule_jobs(self):255 """Schedule new jobs against hosts."""256 key = 'host_scheduler.jobs_per_tick'257 new_jobs_with_hosts = 0258 queue_entries = self.job_query_manager.get_pending_queue_entries(259 only_hostless=False)260 unverified_host_jobs = [job for job in queue_entries261 if not job.is_hostless()]262 if not unverified_host_jobs:263 return264 for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):265 self.schedule_host_job(acquisition.host, acquisition.job)266 self._record_host_assignment(acquisition.host, acquisition.job)267 new_jobs_with_hosts += 1268 autotest_stats.Gauge(key).send('new_jobs_with_hosts',269 new_jobs_with_hosts)270 autotest_stats.Gauge(key).send('new_jobs_without_hosts',271 len(unverified_host_jobs) -272 new_jobs_with_hosts)273 @_timer.decorate274 def _lease_hosts_of_frontend_tasks(self):275 """Lease hosts of tasks scheduled through the frontend."""276 # We really don't need to get all the special tasks here, just the ones277 # without hqes, but reusing the method used by the scheduler ensures278 # we prioritize the same way.279 lease_hostnames = [280 task.host.hostname for task in281 self.job_query_manager.get_prioritized_special_tasks(282 only_tasks_with_leased_hosts=False)283 if task.queue_entry_id is None and not task.host.leased]284 # Leasing a leased hosts here shouldn't be a problem:285 # 1. The only way a host can be leased is if it's been assigned to286 # an active hqe or another similar frontend task, but doing so will287 # have already precluded it from the list of tasks returned by the288 # job_query_manager.289 # 2. The unleasing is done based on global conditions. Eg: Even if a290 # task has already leased a host and we lease it again, the291 # host scheduler won't release the host till both tasks are complete.292 if lease_hostnames:293 self.host_query_manager.set_leased(294 True, hostname__in=lease_hostnames)295 def acquire_hosts(self, host_jobs):296 """Override acquire_hosts.297 This method overrides the method in parent class.298 It figures out a set of suites that |host_jobs| belong to;299 and get min_duts requirement for each suite.300 It pipes min_duts for each suite to rdb.301 """302 parent_job_ids = set([q.job.parent_job_id303 for q in host_jobs if q.job.parent_job_id])304 suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids)305 return rdb_lib.acquire_hosts(host_jobs, suite_min_duts)306 @_timer.decorate307 def tick(self):308 logging.info('Calling new tick.')309 logging.info('Leasing hosts for frontend tasks.')310 self._lease_hosts_of_frontend_tasks()311 logging.info('Finding hosts for new jobs.')312 self._schedule_jobs()313 logging.info('Releasing unused hosts.')314 released_hosts = self._release_hosts()315 logging.info('Updating suite assignment with released hosts')316 self._suite_recorder.record_release(released_hosts)317 logging.info('Calling email_manager.')318 email_manager.manager.send_queued_emails()319class DummyHostScheduler(BaseHostScheduler):320 """A dummy host scheduler that doesn't acquire or release hosts."""321 def __init__(self):322 pass323 def tick(self):324 pass325def handle_signal(signum, frame):326 """Sigint handler so we don't crash mid-tick."""327 global _shutdown328 _shutdown = True...
atlas.py
Source:atlas.py
1#!/usr/bin/env python2import subprocess, os, sys, publicsuffix, inspect, json, shutil, re, pystache3from lxml import etree4HTTPS_E = "https://github.com/EFForg/https-everywhere.git"5release_branch = "release"6stable_branch = "master"7ps = publicsuffix.PublicSuffixList()8index_template = open("templates/index.mustache").read()9letter_template = open("templates/letter.mustache").read()10ruleset_template = open("templates/ruleset.mustache").read()11redirect_template = open("templates/redirect.mustache").read()12domain_rulesets = {}13stable_rulesets = {}14release_rulesets = {}15renderer = pystache.Renderer(string_encoding='utf-8')16def clone_or_update():17 if os.path.isdir("https-everywhere"):18 os.chdir("https-everywhere/src/chrome/content/rules")19 stable()20 result = subprocess.call(["git", "pull", "--no-edit", "-q"])21 if result != 0:22 raise Exception("Could not pull updates")23 else:24 result = subprocess.call(["./clone-https-everywhere.sh", HTTPS_E, stable_branch, release_branch])25 os.chdir("https-everywhere/src/chrome/content/rules")26 if result != 0:27 raise Exception("Could not clone {}".format(HTTPS_E))28def stable():29 if subprocess.call(["git", "checkout", "-q", stable_branch]) != 0:30 raise Exception("Could not switch to branch {}".format(stable_branch))31 if subprocess.call(["git", "pull", "--no-edit", "-q", "origin", stable_branch]) != 0:32 raise Exception("Could not pull from origin on branch {}".format(stable_branch))33 return subprocess.Popen(["git", "log", "-1", "--pretty=format:%h %ai"], stdout=subprocess.PIPE, stderr=None).stdout.read()34def release():35 if subprocess.call(["git", "checkout", "-q", release_branch]) != 0:36 raise Exception("Could not switch to branch {}".format(release_branch))37 if subprocess.call(["git", "pull", "--no-edit", "-q", "origin", release_branch]) != 0:38 raise Exception("Could not pull from origin on branch {}".format(release_branch))39 return subprocess.Popen(["git", "log", "-1", "--pretty=format:%h %ai"], stdout=subprocess.PIPE, stderr=None).stdout.read()40def public_suffix_wrapper(domain):41 if re.match("^([0-9]{1,3}\.){3}[0-9]{1,3}$", domain):42 return domain43 else:44 return ps.get_public_suffix(domain)45def get_names(branch):46 if branch == stable_branch:47 rulesets = stable_rulesets48 else:49 rulesets = release_rulesets50 for fi in sorted(os.listdir(".")):51 if fi[-4:] == ".xml":52 try:53 tree = etree.parse(fi)54 except:55 # Parsing this ruleset failed for some reason.56 continue57 if tree.xpath("/ruleset"):58 dfo = bool(tree.xpath("/ruleset/@default_off"))59 name = tree.xpath("/ruleset/@name")[0]60 current_ruleset = [name, dfo, etree.tostring(tree, encoding='utf-8')]61 rulesets[fi] = current_ruleset62 for host in set(map(public_suffix_wrapper, tree.xpath("/ruleset/target/@host"))):63 host = host.encode("idna").decode('utf-8')64 if host == "*":65 # This is a problem about wildcards at the end of66 # target hosts. Currently, we exclude such targets67 # from having their own listings in the atlas.68 continue69 if host[:2] == "*.":70 # A very small minority of rules apply to the entirety71 # of something that the public suffix list considers72 # a top-level domain, like blogspot.de (because every73 # blogspot blog can perhaps be accessed via HTTPS, but74 # individual users contrain the content of each75 # subdomain). In this unusual case, just list the76 # higher level domain, without the *. part.77 host = host[2:]78 domain_rulesets.setdefault(host, set())79 domain_rulesets[host].add(fi)80 rulesets.setdefault(fi, [])81 rulesets[fi].append(host)82 if dfo: out = "([file %s] %s %s)"83 else: out = "[file %s] %s %s"84clone_or_update()85release_as_of = release()86get_names(release_branch)87stable_as_of = stable()88get_names(stable_branch)89os.chdir("../../../../..")90def hosts_to_filenames(host):91 rulesets_for_host = len(domain_rulesets[host])92 if rulesets_for_host != 1:93 return [host + '-' + str(current) for current in range(1, rulesets_for_host + 1)]94 else:95 return [host]96domains_nested = list(map(hosts_to_filenames, sorted(domain_rulesets.keys())))97domains = [item for sublist in domains_nested for item in sublist]98first_letters_list = sorted(set(n[0] for n in domains))99first_letters = []100for l in first_letters_list:101 first_letters.append({ 'letter': l })102output = pystache.render(index_template, { 'letters': first_letters, 'domains': domains})103open("output/index.html", "w").write(output)104def letter_domain_pairs(domains):105 last_letter = domains[0][0]106 domains_index = []107 for n in domains:108 if n[0] != last_letter:109 yield last_letter, domains_index110 last_letter = n[0]111 domains_index = []112 domains_index.append({ 'domain': n})113 yield last_letter, domains_index114redirect_output = pystache.render(redirect_template, { 'redirect': '../' })115if os.path.exists('output/domains'):116 shutil.rmtree("output/domains")117os.mkdir('output/domains')118open("output/domains/index.html", "w").write(redirect_output)119if not os.path.exists('output/letters'):120 os.mkdir('output/letters')121open("output/letters/index.html", "w").write(redirect_output)122for letter, domains_index in letter_domain_pairs(domains):123 output = pystache.render(letter_template, { 'letters': first_letters,124 'first_letter': letter,125 'domains': domains_index })126 open("output/letters/%s.html" % letter, "w").write(output)127for domain in domain_rulesets:128 if len(domain_rulesets[domain]) > 1:129 num = 1130 for ruleset_filename in domain_rulesets[domain]:131 os.symlink("../rulesets/" + ruleset_filename + ".html", "output/domains/" + domain + "-" + str(num) + ".html")132 num += 1133 else:134 os.symlink("../rulesets/" + domain_rulesets[domain].pop() + ".html", "output/domains/" + domain + ".html")135if not os.path.exists('output/rulesets'):136 os.mkdir('output/rulesets')137for ruleset in set(list(stable_rulesets.keys()) + list(release_rulesets.keys())):138 d = {}139 d["stable_as_of"] = stable_as_of140 d["release_as_of"] = release_as_of141 d["stable_affected"] = False142 d["release_affected"] = False143 d["stable_hosts"] = []144 d["release_hosts"] = []145 if ruleset in stable_rulesets:146 d["stable_hosts"] = json.dumps(stable_rulesets[ruleset][3:])147 name, dfo, xml = stable_rulesets[ruleset][:3]148 d["stable_enabled"] = False149 d["stable_disabled"] = False150 if dfo:151 d["stable_disabled"] = {"rule_text": xml, "git_link": ruleset}152 else:153 d["stable_enabled"] = {"rule_text": xml, "git_link": ruleset}154 if d["stable_disabled"]: d["stable_has_disabled"] = True155 if d["stable_enabled"]: d["stable_has_enabled"] = True156 if ruleset in release_rulesets:157 d["release_hosts"] = json.dumps(release_rulesets[ruleset][3:])158 name, dfo, xml = release_rulesets[ruleset][:3]159 d["release_enabled"] = False160 d["release_disabled"] = False161 if dfo:162 d["release_disabled"] = {"rule_text": xml, "git_link": ruleset}163 else:164 d["release_enabled"] = {"rule_text": xml, "git_link": ruleset}165 if d["release_disabled"]: d["release_has_disabled"] = True166 if d["release_enabled"]: d["release_has_enabled"] = True167 d['stable_branch'] = stable_branch168 d['release_branch'] = release_branch169 output = renderer.render(ruleset_template, d)...
models.py
Source:models.py
1from django.db import models2from datetime import *3from django.contrib.auth.models import AbstractUser4from django.contrib.auth.models import *5from django.utils import timezone6class Host(models.Model):7 ip = models.GenericIPAddressField()8 env = models.CharField(max_length=10)9 def __str__(self):10 return self.ip11class ProjectGroup(models.Model):12 name = models.CharField(max_length=80, unique=True)13 comment = models.CharField(max_length=160, blank=True, null=True)14 def __str__(self):15 return self.name16class Project(models.Model):17 name = models.CharField(max_length=100,null=False,unique=True)18 start_cmd = models.CharField(max_length=100,null=False,unique=True)19 stop_cmd = models.CharField(max_length=100,null=False,unique=True)20 target = models.CharField(max_length=100)21 repos = models.URLField(max_length=200,null=False,unique=True)22 test_env = models.ManyToManyField(Host, related_name='test_ip')23 production_env = models.ManyToManyField(Host, related_name='production_ip')24 staging_env = models.ManyToManyField(Host, related_name='staging_ip')25 port = models.PositiveIntegerField(null=False)26 proxys = models.ManyToManyField(Host, related_name='proxy_ip')27 description = models.CharField(max_length=10)28 group = models.ManyToManyField(ProjectGroup, related_name='group_name')29 def __str__(self):30 return self.name31 class Meta:32 permissions = (33 ("release_test_project", "Can release test project"),34 ("release_staging_project", "Can release staging project"),35 ("release_production_project", "Can release production project"),36 )37#38# class ReleaseHistory(models.Model):39# release_time = models.DateTimeField(auto_now_add=True)40# release_project = models.CharField(max_length=100,null=False,unique=False)41# release_user = models.CharField(max_length=100,null=False,unique=False)42# release_hosts = models.CharField(max_length=100,null=False,unique=False)43# release_ver = models.CharField(max_length=100,null=False,unique=False)44# release_status = models.CharField(max_length=50,null=False,unique=False)45# release_env = models.CharField(max_length=20,null=False,unique=False)46# def __str__(self):47# return self.release_project...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!