Best Python code snippet using molecule_python
inventory.py
Source:inventory.py
...73 self.ensure_required_groups(ROLES)74 if changed_hosts:75 changed_hosts = self.range2ips(changed_hosts)76 self.hosts = self.build_hostnames(changed_hosts)77 self.purge_invalid_hosts(self.hosts.keys(), PROTECTED_NAMES)78 self.set_all(self.hosts)79 self.set_k8s_cluster()80 etcd_hosts_count = 3 if len(self.hosts.keys()) >= 3 else 181 self.set_etcd(list(self.hosts.keys())[:etcd_hosts_count])82 if len(self.hosts) >= SCALE_THRESHOLD:83 self.set_kube_master(list(self.hosts.keys())[etcd_hosts_count:5])84 else:85 self.set_kube_master(list(self.hosts.keys())[:2])86 self.set_kube_node(self.hosts.keys())87 if len(self.hosts) >= SCALE_THRESHOLD:88 self.set_calico_rr(list(self.hosts.keys())[:etcd_hosts_count])89 else: # Show help if no options90 self.show_help()91 sys.exit(0)92 self.write_config(self.config_file)93 def write_config(self, config_file):94 if config_file:95 with open(self.config_file, 'w') as f:96 yaml.dump(self.yaml_config, f)97 else:98 print("WARNING: Unable to save config. Make sure you set "99 "CONFIG_FILE env var.")100 def debug(self, msg):101 if DEBUG:102 print("DEBUG: {0}".format(msg))103 def get_ip_from_opts(self, optstring):104 if 'ip' in optstring:105 return optstring['ip']106 else:107 raise ValueError("IP parameter not found in options")108 def ensure_required_groups(self, groups):109 for group in groups:110 if group == 'all':111 self.debug("Adding group {0}".format(group))112 if group not in self.yaml_config:113 all_dict = OrderedDict([('hosts', OrderedDict({})),114 ('children', OrderedDict({}))])115 self.yaml_config = {'all': all_dict}116 else:117 self.debug("Adding group {0}".format(group))118 if group not in self.yaml_config['all']['children']:119 self.yaml_config['all']['children'][group] = {'hosts': {}}120 def get_host_id(self, host):121 '''Returns integer host ID (without padding) from a given hostname.'''122 try:123 short_hostname = host.split('.')[0]124 return int(re.findall("\\d+$", short_hostname)[-1])125 except IndexError:126 raise ValueError("Host name must end in an integer")127 def build_hostnames(self, changed_hosts):128 existing_hosts = OrderedDict()129 highest_host_id = 0130 try:131 for host in self.yaml_config['all']['hosts']:132 existing_hosts[host] = self.yaml_config['all']['hosts'][host]133 host_id = self.get_host_id(host)134 if host_id > highest_host_id:135 highest_host_id = host_id136 except Exception:137 pass138 # FIXME(mattymo): Fix condition where delete then add reuses highest id139 next_host_id = highest_host_id + 1140 all_hosts = existing_hosts.copy()141 for host in changed_hosts:142 if host[0] == "-":143 realhost = host[1:]144 if self.exists_hostname(all_hosts, realhost):145 self.debug("Marked {0} for deletion.".format(realhost))146 all_hosts.pop(realhost)147 elif self.exists_ip(all_hosts, realhost):148 self.debug("Marked {0} for deletion.".format(realhost))149 self.delete_host_by_ip(all_hosts, realhost)150 elif host[0].isdigit():151 if ',' in host:152 ip, access_ip = host.split(',')153 else:154 ip = host155 access_ip = host156 if self.exists_hostname(all_hosts, host):157 self.debug("Skipping existing host {0}.".format(host))158 continue159 elif self.exists_ip(all_hosts, ip):160 self.debug("Skipping existing host {0}.".format(ip))161 continue162 next_host = "{0}{1}".format(HOST_PREFIX, next_host_id)163 next_host_id += 1164 all_hosts[next_host] = {'ansible_host': access_ip,165 'ip': ip,166 'access_ip': access_ip}167 elif host[0].isalpha():168 raise Exception("Adding hosts by hostname is not supported.")169 return all_hosts170 def range2ips(self, hosts):171 reworked_hosts = []172 def ips(start_address, end_address):173 try:174 # Python 3.x175 start = int(ip_address(start_address))176 end = int(ip_address(end_address))177 except:178 # Python 2.7179 start = int(ip_address(unicode(start_address)))180 end = int(ip_address(unicode(end_address)))181 return [ip_address(ip).exploded for ip in range(start, end + 1)]182 for host in hosts:183 if '-' in host and not host.startswith('-'):184 start, end = host.strip().split('-')185 try:186 reworked_hosts.extend(ips(start, end))187 except ValueError:188 raise Exception("Range of ip_addresses isn't valid")189 else:190 reworked_hosts.append(host)191 return reworked_hosts192 def exists_hostname(self, existing_hosts, hostname):193 return hostname in existing_hosts.keys()194 def exists_ip(self, existing_hosts, ip):195 for host_opts in existing_hosts.values():196 if ip == self.get_ip_from_opts(host_opts):197 return True198 return False199 def delete_host_by_ip(self, existing_hosts, ip):200 for hostname, host_opts in existing_hosts.items():201 if ip == self.get_ip_from_opts(host_opts):202 del existing_hosts[hostname]203 return204 raise ValueError("Unable to find host by IP: {0}".format(ip))205 def purge_invalid_hosts(self, hostnames, protected_names=[]):206 for role in self.yaml_config['all']['children']:207 if role != 'k8s-cluster' and self.yaml_config['all']['children'][role]['hosts']: # noqa208 all_hosts = self.yaml_config['all']['children'][role]['hosts'].copy() # noqa209 for host in all_hosts.keys():210 if host not in hostnames and host not in protected_names:211 self.debug(212 "Host {0} removed from role {1}".format(host, role)) # noqa213 del self.yaml_config['all']['children'][role]['hosts'][host] # noqa214 # purge from all215 if self.yaml_config['all']['hosts']:216 all_hosts = self.yaml_config['all']['hosts'].copy()217 for host in all_hosts.keys():218 if host not in hostnames and host not in protected_names:219 self.debug("Host {0} removed from role all".format(host))...
test_inventory.py
Source:test_inventory.py
...172 'ip': '10.90.0.3',173 'access_ip': '10.90.0.3'})])174 self.assertRaisesRegexp(ValueError, "Unable to find host",175 self.inv.delete_host_by_ip, existing_hosts, ip)176 def test_purge_invalid_hosts(self):177 proper_hostnames = ['node1', 'node2']178 bad_host = 'doesnotbelong2'179 existing_hosts = OrderedDict([180 ('node1', {'ansible_host': '10.90.0.2',181 'ip': '10.90.0.2',182 'access_ip': '10.90.0.2'}),183 ('node2', {'ansible_host': '10.90.0.3',184 'ip': '10.90.0.3',185 'access_ip': '10.90.0.3'}),186 ('doesnotbelong2', {'whateveropts=ilike'})])187 self.inv.yaml_config['all']['hosts'] = existing_hosts188 self.inv.purge_invalid_hosts(proper_hostnames)189 self.assertTrue(190 bad_host not in self.inv.yaml_config['all']['hosts'].keys())191 def test_add_host_to_group(self):192 group = 'etcd'193 host = 'node1'194 opts = {'ip': '10.90.0.2'}195 self.inv.add_host_to_group(group, host, opts)196 self.assertEqual(197 self.inv.yaml_config['all']['children'][group]['hosts'].get(host),198 None)199 def test_set_kube_master(self):200 group = 'kube-master'201 host = 'node1'202 self.inv.set_kube_master([host])...
state.py
Source:state.py
...176 logger.warning((177 'Use of `State.limit` is deprecated, '178 'please use normal `if` statements instead.'179 ))180 return self.hosts(hosts)181 @contextmanager182 def hosts(self, hosts):183 logger.warning((184 'Use of `State.hosts` is deprecated, '185 'please use normal `if` statements instead.'186 ))187 hosts = ensure_host_list(hosts, inventory=self.inventory)188 # Store the previous value189 old_limit_hosts = self.limit_hosts190 # Limit the new hosts to a subset of the old hosts if they existed191 if old_limit_hosts is not None:192 hosts = [193 host for host in hosts194 if host in old_limit_hosts195 ]196 # Set the new value197 self.limit_hosts = hosts198 logger.debug('Setting limit to hosts: {0}'.format(hosts))199 yield200 # Restore the old value201 self.limit_hosts = old_limit_hosts202 logger.debug('Reset limit to hosts: {0}'.format(old_limit_hosts))203 @contextmanager204 def when(self, predicate):205 logger.warning((206 'Use of `State.when` is deprecated, '207 'please use normal `if` statements instead.'208 ))209 # Truth-y? Just yield/end, nothing happens here!210 if predicate:211 yield212 return213 # Otherwise limit any operations within to match no hosts214 with self.hosts([]):215 yield216 @contextmanager217 def deploy(self, name, kwargs, data, line_number, in_deploy=True):218 '''219 Wraps a group of operations as a deploy, this should not be used220 directly, instead use ``pyinfra.api.deploy.deploy``.221 '''222 # Handle nested deploy names223 if self.deploy_name:224 name = _make_name(self.deploy_name, name)225 # Store the previous values226 old_in_deploy = self.in_deploy227 old_deploy_name = self.deploy_name228 old_deploy_kwargs = self.deploy_kwargs229 old_deploy_data = self.deploy_data230 old_deploy_line_numbers = self.deploy_line_numbers231 self.in_deploy = in_deploy232 # Limit the new hosts to a subset of the old hosts if they existed233 if (234 old_deploy_kwargs235 and old_deploy_kwargs.get('hosts') is not None236 ):237 # If we have hosts - subset them based on the old hosts238 if 'hosts' in kwargs:239 kwargs['hosts'] = [240 host for host in kwargs['hosts']241 if host in old_deploy_kwargs['hosts']242 ]243 # Otherwise simply carry the previous hosts244 else:245 kwargs['hosts'] = old_deploy_kwargs['hosts']246 # Make new line numbers - note convert from and back to tuple to avoid247 # keeping deploy_line_numbers mutable.248 new_line_numbers = list(self.deploy_line_numbers or [])249 new_line_numbers.append(line_number)250 new_line_numbers = tuple(new_line_numbers)251 # Set the new values252 self.deploy_name = name253 self.deploy_kwargs = kwargs254 self.deploy_data = data255 self.deploy_line_numbers = new_line_numbers256 logger.debug('Starting deploy {0} (args={1}, data={2})'.format(257 name, kwargs, data,258 ))259 yield260 # Restore the previous values261 self.in_deploy = old_in_deploy262 self.deploy_name = old_deploy_name263 self.deploy_kwargs = old_deploy_kwargs264 self.deploy_data = old_deploy_data265 self.deploy_line_numbers = old_deploy_line_numbers266 logger.debug('Reset deploy to {0} (args={1}, data={2})'.format(267 old_deploy_name, old_deploy_kwargs, old_deploy_data,268 ))269 @contextmanager270 def preserve_loop_order(self, items):271 frameinfo = get_caller_frameinfo(frame_offset=1) # escape contextlib272 self.loop_line = frameinfo.lineno273 def item_generator():274 for i, item in enumerate(items, 1):275 self.loop_counter = i276 yield item277 yield item_generator278 self.loop_counter = None279 self.loop_line = None280 def get_op_order(self):281 line_numbers_to_hash = self.op_line_numbers_to_hash282 sorted_line_numbers = sorted(list(line_numbers_to_hash.keys()))283 return [284 line_numbers_to_hash[numbers]285 for numbers in sorted_line_numbers286 ]287 def activate_host(self, host):288 '''289 Flag a host as active.290 '''291 logger.debug('Activating host: {0}'.format(host))292 # Add to *both* activated and active - active will reduce as hosts fail293 # but connected will not, enabling us to track failed %.294 self.activated_hosts.add(host)295 self.active_hosts.add(host)296 # def ready_host(self, host):297 # '''298 # Flag a host as ready, after which facts will not be gathered for it.299 # '''300 # logger.debug('Readying host: {0}'.format(host))301 # self.ready_hosts.add(host)302 def fail_hosts(self, hosts_to_fail, activated_count=None):303 '''304 Flag a ``set`` of hosts as failed, error for ``config.FAIL_PERCENT``.305 '''306 if not hosts_to_fail:307 return308 activated_count = activated_count or len(self.activated_hosts)309 logger.debug('Failing hosts: {0}'.format(', '.join(310 (host.name for host in hosts_to_fail),311 )))312 self.failed_hosts.update(hosts_to_fail)313 self.active_hosts -= hosts_to_fail314 # Check we're not above the fail percent315 active_hosts = self.active_hosts316 # No hosts left!...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!