Best Python code snippet using autotest_python
libvirt_helper.py
Source:libvirt_helper.py
1import sys2import libvirt3from xml.etree import ElementTree4import time5import ebt_system6class Libvirt(object):7 def __init__(self, uri='qemu:///system'):8 self.conn = libvirt.open(uri)9 def list_domains(self):10 return self.conn.listAllDomains()11 @staticmethod12 def get_domain_disks(domain):13 assert isinstance(domain, libvirt.virDomain), '{1}.{2}: variable "{0}" has wrong type.' \14 .format('domain', __name__, sys._getframe().f_code.co_name)15 domain_xml = domain.XMLDesc(0)16 root = ElementTree.fromstring(domain_xml)17 disks = root.findall('./devices/disk')18 disks_list = list()19 for disk in disks:20 disk_info = {}21 if disk.attrib['device'] in ('disk',):22 if (disk.find('source') is not None) and (disk.find('source').get('dev') is not None):23 disk_info['path'] = disk.find('source').get('dev')24 disk_info['target'] = disk.find('target').get('dev')25 disk_info['source_type'] = 'dev'26 disk_info['snapshot_path'] = None27 disks_list.append(disk_info)28 elif (disk.find('source') is not None) and (disk.find('source').get('file') is not None):29 disk_info['path'] = disk.find('source').get('file')30 disk_info['target'] = disk.find('target').get('dev')31 disk_info['source_type'] = 'file'32 disk_info['snapshot_path'] = None33 disks_list.append(disk_info)34 return disks_list35 def filter_domain_list(self, domains, include=list(), exclude=list()):36 assert isinstance(include, list), '{1}.{2}: variable "{0}" has wrong type.' \37 .format('include', __name__, sys._getframe().f_code.co_name)38 assert isinstance(exclude, list), '{1}.{2}: variable "{0}" has wrong type.' \39 .format('exclude', __name__, sys._getframe().f_code.co_name)40 assert isinstance(domains, list) and isinstance(domains[0],41 libvirt.virDomain), '{1}.{2}: variable "{0}" has wrong type.' \42 .format('domains', __name__, sys._getframe().f_code.co_name)43 filtered_list = list()44 for domain in domains:45 if (domain.name() in include) or (('all' not in exclude) and (domain.name() not in exclude)):46 filtered_list.append(domain)47 return filtered_list48 @staticmethod49 def export_xml(domain, path):50 domain_xml = domain.XMLDesc(0)51 xml_file = open(path, mode='w')52 xml_file.write(domain_xml)53 xml_file.close()54 @staticmethod55 def device_size(domain, path):56 return int(domain.blockInfo(path)[0])57 def restore(self, path):58 self.conn.restore(path)59 @staticmethod60 def create_snapshot_xml(disks, memory_path=None):61 assert isinstance(disks, list) and isinstance(disks[0], dict), '{1}.{2}: variable "{0}" has wrong type.' \62 .format('disks', __name__, sys._getframe().f_code.co_name)63 assert (memory_path is None) or isinstance(memory_path, str), '{1}.{2}: variable "{0}" has wrong type.' \64 .format('memory_path', __name__, sys._getframe().f_code.co_name)65 snap_xml = ElementTree.Element('domainsnapshot')66 disks_xml = ElementTree.SubElement(snap_xml, 'disks')67 if memory_path is None:68 ElementTree.SubElement(snap_xml, 'memory', {'snapshot': 'no'})69 else:70 ElementTree.SubElement(snap_xml, 'memory', {'snapshot': 'external', 'file': memory_path})71 for disk in disks:72 if disk['snapshot_path'] is None:73 ElementTree.SubElement(disks_xml, 'disk', {'name': disk['target'], 'snapshot': 'no'})74 else:75 disk_xml = ElementTree.SubElement(disks_xml, 'disk', {'name': disk['target'], 'snapshot': 'external'})76 ElementTree.SubElement(disk_xml, 'source', {'file': disk['snapshot_path']})77 snap_xml_str = ElementTree.tostring(snap_xml, encoding='utf8', method='xml')78 return snap_xml_str79 def create_vm_snapshot(self, domain, disks, memory_path=None, atomic=True, quiesce=False):80 assert isinstance(domain, libvirt.virDomain), '{1}.{2}: variable "{0}" has wrong type.' \81 .format('domain', __name__, sys._getframe().f_code.co_name)82 assert isinstance(disks, list) and isinstance(disks[0], dict), '{1}.{2}: variable "{0}" has wrong type.' \83 .format('disks', __name__, sys._getframe().f_code.co_name)84 assert (memory_path is None) or isinstance(memory_path, str), '{1}.{2}: variable "{0}" has wrong type.' \85 .format('memory_path', __name__, sys._getframe().f_code.co_name)86 assert isinstance(atomic, bool), '{1}.{2}: variable "{0}" has wrong type.' \87 .format('atomic', __name__, sys._getframe().f_code.co_name)88 assert isinstance(quiesce, bool), '{1}.{2}: variable "{0}" has wrong type.' \89 .format('quiesce', __name__, sys._getframe().f_code.co_name)90 flags = 091 if memory_path is None:92 flags |= libvirt.VIR_DOMAIN_SNAPSHOT_CREATE_DISK_ONLY93 else:94 flags |= libvirt.VIR_DOMAIN_SNAPSHOT_CREATE_LIVE95 if atomic:96 flags |= libvirt.VIR_DOMAIN_SNAPSHOT_CREATE_ATOMIC97 if quiesce:98 flags |= libvirt.VIR_DOMAIN_SNAPSHOT_CREATE_QUIESCE99 snap_xml = self.create_snapshot_xml(disks, memory_path)100 snap = domain.snapshotCreateXML(snap_xml, flags)101 return snap102 @staticmethod103 def remove_vm_snapshot(domain, disks):104 assert isinstance(domain, libvirt.virDomain), '{1}.{2}: variable "{0}" has wrong type.' \105 .format('domain', __name__, sys._getframe().f_code.co_name)106 assert isinstance(disks, list) and isinstance(disks[0], dict), '{1}.{2}: variable "{0}" has wrong type.' \107 .format('disks', __name__, sys._getframe().f_code.co_name)108 flags = libvirt.VIR_DOMAIN_BLOCK_COMMIT_ACTIVE109 for disk in disks:110 if disk['snapshot_path'] is not None:111 domain.blockCommit(disk=disk['target'], base=None, top=None, flags=flags)112 while True:113 status = domain.blockJobInfo(disk['target'])114 if status['cur'] == status['end']:115 domain.blockJobAbort(disk=disk['target'], flags=libvirt.VIR_DOMAIN_BLOCK_JOB_ABORT_PIVOT)116 ebt_system.rm(disk['snapshot_path'])117 break118 else:...
rl_config.py
Source:rl_config.py
1import os2import numpy as np3from datetime import datetime4TIMESTAMP = "{0:%Y-%m-%d-Time%H-%M-%S}".format(datetime.now())5class DebugConfig:6 """7 This is parameters for experiment debug.8 """9 # Training parameters10 total_episodes = 10011 noised_episodes = 2012 max_steps = 30013 batch_size = 1024 # 25614 train_frequency = 500 # 215 # NN architecture16 ego_feature_num = 417 npc_num = 518 npc_feature_num = 419 state_size = ego_feature_num + npc_num * npc_feature_num20 action_size = 221 lra = 2e-522 lrc = 1e-423 # Fixed Q target hyper parameters24 tau = 1e-325 # exploration hyperparamters for ep. greedy. startegy26 explore_start = 0.75 # exploration probability at start27 explore_stop = 0.01 # minimum exploration probability28 explore_step = 40000 # 40k steps29 decay_rate = (explore_start - explore_stop) / explore_step # exponential decay rate for exploration prob30 # Q LEARNING hyperparameters31 gamma = 0.99 # Discounting rate32 pretrain_length = 500 # Number of experiences stored in the Memory when initialized for the first time --INTIALLY 100k33 memory_size = 200000 # Number of experiences the Memory can keep --INTIALLY 100k34 load_memory = False # If True load memory, otherwise fill the memory with new data35 # ==================================================36 # output paths37 tag = 'debug'38 output_path = os.path.join('./outputs', tag, TIMESTAMP)39 memory_path = os.path.join(output_path, 'rl_replay_memory')40 # os.makedirs(memory_path, exist_ok=True)41 memory_load_path = os.path.join(memory_path, 'memory.pkl')42 memory_save_path = os.path.join(memory_path, 'memory.pkl')43 # model saving44 model_save_frequency = 2 # frequency to save the model. 0 means not to save45 model_save_frequency_no_paste = 500 # ???46 # frequency to check best models47 model_save_frequency_high_success = 1048 model_test_frequency = 1049 model_test_eps = 10 # ???50 # final model save path51 model_save_path = os.path.join(output_path, 'final_model', 'final_model.ckpt')52 # checkpoint save path53 model_ckpt_path = os.path.join(output_path, 'checkpoints')54 # best model55 best_model_path = os.path.join(output_path, 'best_models')56class hyperParameters:57 """58 Hyperparameters for RL agent59 """60 # Training parameters61 total_episodes = 1000062 noised_episodes = 200063 # todo add setter for env64 max_steps = 30065 batch_size = 1024 # 256, 512, 102466 train_frequency = 2 # 267 # td368 policy_delayed = 269 # NN architecture70 ego_feature_num = 4 # 9 for abs_all, 4 for sumo and sumo_171 npc_num = 572 npc_feature_num = 673 state_size = ego_feature_num + npc_num * npc_feature_num74 action_size = 275 # Fixed Q target hyper parameters76 tau = 1e-377 # exploration hyper-parameters for epsilon-greedy strategy78 explore_start = 0.5 # exploration probability at start79 explore_stop = 0.05 # minimum exploration probability80 explore_step = 20000 # 40k, 4000081 # decay_rate = (explore_start - explore_stop) / explore_step # exponential decay rate for exploration prob82 # Q LEARNING hyperparameters83 gamma = 0.99 # Discounting rate84 pretrain_length = 10000 # Number of experiences stored in the Memory when initialized for the first time --INTIALLY 100k85 memory_size = 500000 # Number of experiences the Memory can keep --INTIALLY 100k86 load_memory = False # If True load memory, otherwise fill the memory with new data87 # ==================================================88 # output paths89 # tag = 'CarlaEnv3'90 tag = 'CarlaEnv4'91 # model saving92 model_save_frequency = 50 # frequency to save the model. 0 means not to save93 model_save_frequency_no_paste = 500 # ???94 # frequency to check best models95 model_save_frequency_high_success = 2096 model_test_frequency = 1097 model_test_eps = 10 # ???98 # ================ Decay learning rate ================99 lra = 2e-5 # 2e-5100 lrc = 5e-5 # 1e-4101 # todo this number is determined by downsample factor102 guessing_episode_length = 200103 # decay after certain number of episodes104 decay_episodes = 1500105 decay_steps = guessing_episode_length / train_frequency * decay_episodes106 decay_rate = 1 / 2.15 # 2.15 = 10^(1/3)107 def __init__(self, args=None):108 # todo need to fix api of the rl_utils109 self.state_size = self.ego_feature_num + self.npc_num * self.npc_feature_num110 self.generate_output_path(args)111 def generate_output_path(self, args=None):112 if args:113 if args.tag:114 output_path = os.path.join('./outputs', args.route_option, self.tag, args.tag, TIMESTAMP)115 else:116 output_path = os.path.join('./outputs', args.route_option, self.tag, TIMESTAMP)117 else:118 output_path = os.path.join('./outputs/please_check', self.tag, TIMESTAMP)119 self.output_path = output_path120 self.memory_path = os.path.join(output_path, 'rl_replay_memory')121 # os.makedirs(memory_path, exist_ok=True)122 self.memory_load_path = os.path.join(self.memory_path, 'memory.pkl')123 self.memory_save_path = os.path.join(self.memory_path, 'memory.pkl')124 # checkpoint save path125 self.model_ckpt_path = os.path.join(output_path, 'checkpoints')126 # best model127 self.best_model_path = os.path.join(output_path, 'best_models')128 # final model save path...
memories.py
Source:memories.py
1import discord2from discord.ext import commands3import os4import random as rng5MEMORY_PATH = './resources/memories/'6class Memories(commands.Cog):7 def __init__(self, client):8 self.client = client9 @commands.command()10 async def memory(self, ctx, selection=None):11 memories = []12 if not selection:13 for filename in os.listdir(MEMORY_PATH):14 memories.append(f'{MEMORY_PATH}{filename}')15 await ctx.send(file=discord.File(rng.choice(memories)))16 else:17 await ctx.send(f'{MEMORY_PATH}{selection}.png')18 @commands.command(aliases=['addmemories', 'remember'])19 async def addmemory(self, ctx):20 await ctx.channel.purge(limit=1)21 img_types = ['.png', '.jpg', '.jpeg', '.gif']22 print('Attempting to save attachment...')23 if not ctx.message.attachments:24 await ctx.send('Please supply a link or attachment to save to memories.')25 else:26 for attachment in ctx.message.attachments:27 if (attachment.filename.lower().endswith(image) for image in img_types):28 save_file = (29 f'{MEMORY_PATH}{len(os.listdir(MEMORY_PATH))+1}.png')30 await ctx.send('Attempting to save memory.')31 await attachment.save(save_file)32 await ctx.send(f'File saved as `{save_file}`.')33 else:34 await ctx.send('Image type not supported.')35 @commands.command()36 async def memories(self, ctx):37 i = len(os.listdir(MEMORY_PATH))38 await ctx.send(f'I have {i} memories saved.')39def setup(client):40 client.add_cog(Memories(client))41 print(f'Loaded {os.path.basename(__file__)} successfully')...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!