Best Python code snippet using hypothesis
toolbox-migrate.py
Source:toolbox-migrate.py
1#!/usr/bin/env python2# TODO: Implement cleanup()3# TODO: Better error handling during copy/install operations4# TODO: Diff backup RPMs and currently installed RPMs5import argparse6import logging7import os8import re9import rpm10import shutil11import subprocess12#: Default directory to backup in13BACKUP_DIR = ".local/share/toolbox-backup/"14#: Location to use for yum repos15YUM_REPOS_DIR = "/etc/yum.repos.d/"16#: Location to use for CA/Certs17CA_CERT_DIR = "/etc/pki/ca-trust/source/anchors/"18class _Base:19 def __call__(self, dirpath=None, repos=None, rpms=None, certs=None):20 return self._execute(dirpath, repos, rpms, certs)21 def _execute(self, dirpath, repos, rpms, certs):22 raise NotImplementedError('_execute must be implemented')23 def require_superuser(self):24 if os.getuid() != 0:25 logging.error("Must run restore operation as superuser")26 raise SystemExit(1)27 def ls(self, dirpath, description):28 dirls = os.listdir(dirpath)29 if len(dirls) < 1:30 logging.warning(31 f"Did not find any {description} to restore at {dirpath}")32 return list()33 return dirls34 def check_file_exists(self, fpath):35 if not os.path.isfile(fpath):36 logging.error(f"Unable to find {fpath}")37 raise SystemExit(1)38 def check_dir_exists(self, dirpath, description, create=False):39 if not os.path.isdir(dirpath):40 err = f"Unable to find {description} dir at {dirpath}"41 if create:42 logging.warning(err)43 logging.debug(44 f"Making {description} dir at {dirpath}")45 os.mkdir(os.path.expanduser(dirpath))46 else:47 logging.error(err)48 raise SystemExit(1)49 def run_command(self, cmd, description):50 command = subprocess.run(51 cmd, capture_output=True, text=True)52 if command.returncode != 0:53 logging.error("Failed to {description}")54 logging.error(command.stderr)55 raise SystemExit(1)56 return command57 def copy_file(self, src_parts, dst_parts, description):58 src = os.path.join(*src_parts)59 dst = os.path.join(*dst_parts)60 logging.debug(f"Copying {description}: {src}->{dst}")61 shutil.copy(src, dst)62class Backup(_Base):63 def _execute(self, dirpath, repos, rpms, certs):64 if dirpath is None:65 dirpath = os.path.join(os.environ['HOME'], BACKUP_DIR)66 backup_dir_path = os.path.expanduser(dirpath)67 self.check_dir_exists(backup_dir_path, "toolbox backup", True)68 # Backup everything by default; if one of the options is not None69 # then we'll only backup that part70 backup_all = True71 if repos or rpms or certs:72 backup_all = False73 if backup_all or repos:74 fedora_re = re.compile('^fedora.*')75 yum_repos_dir = os.listdir(YUM_REPOS_DIR)76 yum_repo_backup_dir = os.path.join(backup_dir_path, "repos")77 self.check_dir_exists(yum_repo_backup_dir, "yum repo", True)78 for repo in yum_repos_dir:79 if not fedora_re.match(repo):80 self.copy_file(81 [YUM_REPOS_DIR, repo],82 [yum_repo_backup_dir, repo],83 "yum repo file")84 if backup_all or rpms:85 rpm_backup = os.path.join(backup_dir_path, "toolbox-rpms.backup")86 logging.debug(87 f"Backing up names of installed RPMs to {rpm_backup}")88 txn_set = rpm.TransactionSet()89 rpmdb = txn_set.dbMatch()90 with open(rpm_backup, 'w') as f:91 for rpms in rpmdb:92 f.write(f"{rpms['name']} ")93 if backup_all or certs:94 cert_list = os.listdir(CA_CERT_DIR)95 cert_backup_dir = os.path.join(backup_dir_path, "certs")96 self.check_dir_exists(cert_backup_dir, "CA cert", True)97 for cert in cert_list:98 self.copy_file(99 [CA_CERT_DIR, cert],100 [cert_backup_dir, cert],101 "CA cert")102 logging.debug("Backup of toolbox config complete")103class Restore(_Base):104 def _execute(self, dirpath, repos, rpms, certs):105 self.require_superuser()106 if dirpath is None:107 dirpath = os.path.join(108 "/var/home/", os.environ['SUDO_USER'], BACKUP_DIR)109 backup_dir_path = os.path.expanduser(dirpath)110 self.check_dir_exists(dirpath, "toolbox", False)111 restore_all = True112 if repos or rpms or certs:113 restore_all = False114 if restore_all or certs:115 cert_backup_dir = os.path.join(backup_dir_path, "certs")116 self.check_dir_exists(cert_backup_dir, "CA cert", False)117 backup_certs = self.ls(cert_backup_dir, "CA cert")118 for cert in backup_certs:119 self.copy_file(120 [cert_backup_dir, cert],121 [CA_CERT_DIR, cert],122 "CA cert")123 if backup_certs:124 self.run_command(125 ['update-ca-trust'], "update the CA trust")126 if restore_all or repos:127 yum_repo_backup_dir = os.path.join(backup_dir_path, "repos")128 self.check_dir_exists(yum_repo_backup_dir, "repo", False)129 backup_repos = self.ls(yum_repo_backup_dir, 'yum repo')130 for repo in backup_repos:131 self.copy_file(132 [yum_repo_backup_dir, repo],133 [YUM_REPOS_DIR, repo],134 "repo file")135 if restore_all or rpms:136 rpms_backup_file = os.path.join(137 backup_dir_path, "toolbox-rpms.backup")138 self.check_file_exists(rpms_backup_file)139 dnf_install = ['dnf', '-y', '--skip-broken', 'install']140 with open(rpms_backup_file, 'r') as f:141 dnf_install = dnf_install + f.read().split()142 logging.debug("Starting restore of RPMs")143 install_cp = self.run_command(144 dnf_install, "restore RPMS from backup list")145 nomatch_re = re.compile("^No match for argument: (.*)")146 nomatch_rpms = []147 for line in install_cp.stdout.split("\n"):148 m = nomatch_re.match(line)149 if m:150 nomatch_rpms.append(m.group(1))151 if len(nomatch_rpms) > 0:152 logging.debug(153 f"Unable to install following RPMs: {nomatch_rpms}")154 logging.debug("Finished restoring RPMs")155 if restore_all or certs:156 cert_backup_dir = os.path.join(backup_dir_path, "certs")157 self.check_dir_exists(cert_backup_dir, "CA cert", False)158 backup_certs = self.ls(cert_backup_dir, "CA cert")159 for cert in backup_certs:160 self.copy_file(161 [cert_backup_dir, cert],162 [CA_CERT_DIR, cert],163 "CA cert")164 if backup_certs:165 self.run_command(166 ['update-ca-trust'], 'update the CA trust')167# TODO: Implement168class CleanUp(_Base):169 pass170def main():171 parser = argparse.ArgumentParser()172 parser.add_argument(173 'operation',174 choices=['backup', 'cleanup', 'restore'],175 help="The operation to perform")176 parser.add_argument(177 '--verbose', action='store_true',178 help="Make the operation more talkative")179 parser.add_argument(180 '--dir',181 help="Specify custom directory location to use for operations")182 group = parser.add_mutually_exclusive_group()183 group.add_argument(184 '--repos', action='store_true',185 help="Only operate on yum repos")186 group.add_argument(187 '--rpms', action='store_true',188 help="Only operate on installed RPMs")189 group.add_argument(190 '--certs', action='store_true',191 help="Only operate on installed CA certs")192 args = parser.parse_args()193 if args.verbose:194 logging.basicConfig(level=logging.DEBUG)195 cmd = None196 if args.operation == "backup":197 cmd = Backup()198 elif args.operation == "restore":199 cmd = Restore()200 elif args.operation == "cleanup":201 cmd = CleanUp()202 cmd(203 dirpath=args.dir, repos=args.repos,204 rpms=args.rpms, certs=args.certs)205if __name__ == "__main__":...
test_bdd_system.py
Source:test_bdd_system.py
...35 S = system([self.cl, self.cl])36 Sbdd = system([self.cl_bdd, self.cl_bdd2])37 Ux, _ = S.gen_safety_scheduler_part(convert_blocks=True)38 Uxbdd = Sbdd.gen_safety_scheduler_basic()39 S.restore_all()40 S.compose()41 self.__compUx(S, Sbdd, Ux, Uxbdd)42 def testRegSchedPartEquivalent(self):43 S = system([self.cl, self.cl])44 Sbdd = system([self.cl_bdd, self.cl_bdd2])45 Ux, _ = S.gen_safety_scheduler_part(convert_blocks=True)46 Uxbdd, _ = Sbdd.gen_safety_scheduler_part(convert_blocks=True)47 S.restore_all()48 S.compose()49 Sbdd.restore_all()50 Sbdd.compose()51 self.__compUx(S, Sbdd, Ux, Uxbdd)52 def testTrapStateSchedEquivalent(self):53 S = system([self.cl, self.cl], trap_state=True)54 Sbdd = system([self.cl_bdd, self.cl_bdd2], trap_state=True)55 Ux, _ = S.gen_safety_scheduler_part(convert_blocks=True)56 Uxbdd = Sbdd.gen_safety_scheduler_basic()57 S.restore_all()58 S.compose()59 self.__compUx(S, Sbdd, Ux, Uxbdd, add_Trap=True)60 S = system([self.cl, self.cl], trap_state=True)61 Sbdd = system([self.cl_bdd, self.cl_bdd2], trap_state=True)62 Ux, _ = S.gen_safety_scheduler_part(convert_blocks=True)63 Uxbdd,_ = Sbdd.gen_safety_scheduler_part(convert_blocks=True)64 S.restore_all()65 S.compose()66 Sbdd.restore_all()67 Sbdd.compose()68 self.__compUx(S, Sbdd, Ux, Uxbdd, add_Trap=True)69 def __compUx(self, S, Sbdd, Ux, Uxbdd, add_Trap=False):70 for (x, uuu) in Ux.items():71 iiencs = [schedbdd.controlloop.enc(a.xvars, b.states[ii]) for (a, b, ii) in72 zip(Sbdd.control_loops, S.control_loops, x)]73 i_enc = ''74 for i in iiencs:75 i_enc += ' & ' + i76 if add_Trap:77 i_enc += f' & !{Sbdd._trapx_var}'78 i_enc = i_enc[3:]79 for uu in uuu:80 uuencs = [schedbdd.controlloop.enc(a.uvars, b.actions[u]) for (a, b, u) in...
restore_endorsements.py
Source:restore_endorsements.py
1# Copyright 2022 UW-IT, University of Washington2# SPDX-License-Identifier: Apache-2.03from django.core.management.base import BaseCommand4from django.utils import timezone5from endorsement.models import Endorser, EndorsementRecord6from endorsement.dao.endorse import set_active_category, activate_subscriptions7from endorsement.services import get_endorsement_service8from datetime import timedelta9import logging10logger = logging.getLogger(__name__)11class Command(BaseCommand):12 help = "Go behind the curtain to restore previously cleared endorsements"13 def add_arguments(self, parser):14 parser.add_argument('endorser', type=str)15 parser.add_argument(16 '-l',17 '--list',18 action='store_true',19 dest='list_endorsements',20 default=False,21 help='List endorsed netids with category codes',22 )23 parser.add_argument(24 '-c',25 '--commit',26 action='store_true',27 dest='restore_all',28 default=False,29 help='Restore all endorsements without confirmation',30 )31 def handle(self, *args, **options):32 endorser_netid = options['endorser']33 list_endorsements = options['list_endorsements']34 restore_all = options['restore_all']35 now = timezone.now()36 endorser = Endorser.objects.get(netid=endorser_netid)37 for er in EndorsementRecord.objects.filter(endorser=endorser):38 service = get_endorsement_service(er.category_code)39 # deleted and previously endorsed with lifespan remaining40 if (er.is_deleted41 and er.datetime_endorsed42 and er.datetime_endorsed > (now - timedelta(43 days=service.endorsement_lifetime))44 and not (45 er.datetime_notice_1_emailed46 and er.datetime_notice_2_emailed47 and er.datetime_notice_3_emailed48 and er.datetime_notice_4_emailed)):49 if list_endorsements:50 print("{} {}".format(er.category_code, er.endorsee.netid))51 continue52 if not restore_all:53 prompt = "Restore {} for {} by {}? (y/N): ".format(54 er.category_code, er.endorsee.netid, er.endorser.netid)55 response = input(prompt).strip().lower()56 if response != 'y':57 continue58 self._restore(er, service)59 def _restore(self, er, service):60 print("Activate category {} for {}".format(61 service.category_code, er.endorsee.netid))62 set_active_category(er.endorsee.netid, service.category_code)63 print("Subscribe codes {} for {}".format(64 service.subscription_codes, er.endorsee.netid))65 activate_subscriptions(66 er.endorsee.netid, er.endorser.netid, service.subscription_codes)67 print("Restore endorsement record of {} for {} by {}".format(68 er.category_code, er.endorsee.netid, er.endorser.netid))69 er.is_deleted = None70 er.datetime_expired = None...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!