Best Python code snippet using molecule_python
install.py
Source:install.py
...37 )38 # After parsing check if a token is mandatory or advised39 if (any(d['type'] == 'gitlab' for d in requirements) and40 not args['gitlab_token']):41 util.sysexit_with_message(42 "Gitlab repo in requirements but no token passed, \43 use --gitlab-token"44 )45 if (any(d['type'] == 'github' for d in requirements) and46 not args['github_token']):47 util.sysexit_with_message(48 "Github repo in requirements but no token passed, \49 use --github-token"50 )51 # Init lockfile52 locks_path = pathlib.Path(args['lock_file']).resolve()53 # Init current_lock list54 current_lock = {}55 if locks_path.is_file():56 current_lock = _parse_and_validate(57 path=locks_path,58 type='locks'59 )60 else:61 LOG.info("Lock file does not exist, will be created at {path}".format(62 path=locks_path)63 )64 # Init new_lock list65 new_lock = {}66 # Loop requirements for downloading67 for requirement in requirements:68 if requirement['name'] in current_lock and not upgrade:69 LOG.info("{requirement} already installed, skipping".format(70 requirement=requirement['name'])71 )72 else:73 try:74 if requirement['type'] == 'gitlab':75 # Init Gitlab object76 source = sources.gitlab.Gitlab(77 repo=requirement['repo'],78 version=requirement.get('version'),79 token=args['gitlab_token']80 )81 elif requirement['type'] == 'github':82 # Init Github object83 source = sources.github.Github(84 repo=requirement['repo'],85 version=requirement.get('version'),86 token=args['github_token']87 )88 except (exceptions.RepoNotFound,89 exceptions.HttpError,90 exceptions.AuthenticationError) as e:91 util.sysexit_with_message(str(e))92 try:93 commit_sha = source.get_commit_hash()94 except exceptions.CommitHashNotFound as e:95 util.sysexit_with_message(str(e))96 if commit_sha == current_lock.get(requirement['name']):97 LOG.info(98 "{requirement} already the current version, "99 "skipping".format(100 requirement=requirement['name'])101 )102 else:103 # Convert dest to absolute path104 dest = pathlib.Path(requirement.get('dest', '')).resolve()105 # Create archive name (ex. ansible-role-plex.tar.gz)106 archive_name = "{}.tar.gz".format(requirement['name'])107 # Append name to destination directory108 archive_dest = dest.joinpath(archive_name)109 # Remove old versions if existing110 test = dest.joinpath(requirement['name'])111 if test.is_dir():112 shutil.rmtree(test)113 # Get the archive114 try:115 source.get_archive(116 dest=archive_dest117 )118 except (exceptions.DirectoryDoesNotExist,119 exceptions.AuthenticationError,120 exceptions.ArchiveNotFound) as e:121 # Write current state to lock122 _write_lock_file(123 path=locks_path,124 current_lock=current_lock,125 new_lock=new_lock126 )127 util.sysexit_with_message(str(e))128 # Extract archive to location129 try:130 source.extract_archive(131 src=archive_dest,132 dest=dest,133 name=requirement['name']134 )135 except (exceptions.DirectoryDoesNotExist,136 TypeError,137 FileNotFoundError) as e:138 # Write current state to lock139 _write_lock_file(140 path=locks_path,141 current_lock=current_lock,142 new_lock=new_lock143 )144 # Cleanup archive145 util.remove_file(archive_dest)146 # Exit with message147 util.sysexit_with_message(str(e))148 # No exceptions add to new_lock since succesfull download149 if requirement['name'] in current_lock:150 LOG.success("{requirement} successfully updated to \151 {version}".format(152 requirement=requirement['name'],153 version=source.version154 )155 )156 else:157 LOG.success("{requirement} successfully installed".format(158 requirement=requirement['name'])159 )160 try:161 new_lock[requirement['name']] = source.get_commit_hash()162 except exceptions.CommitHashNotFound as e:163 util.sysexit_with_message(str(e))164 # End for loop165 _write_lock_file(166 path=locks_path,167 current_lock=current_lock,168 new_lock=new_lock169 )170def _write_lock_file(path, current_lock, new_lock):171 """172 Write current state to lock file173 :param path: path of the current lock file and where to write to174 :param current_lock: dict containing the lock before executing gip175 :param new_lock: dict container the new lock176 """177 # Check if new_lock has data178 if new_lock:179 # No current lock just write to file180 if not current_lock:181 util.write_yaml(182 path=path,183 data=new_lock184 )185 else:186 # Current lock, merge the two and write to file187 util.write_yaml(188 path=path,189 data=util.merge_dicts(current_lock, new_lock)190 )191def _parse_and_validate(path, type):192 """193 Parse and validate file against Cerberus scheme194 :param path: path to file195 :param type: one of the modeltypes available in model.scheme196 :return: parsed and validated data197 """198 try:199 data = util.read_yaml(path=path)200 except exceptions.ParserError as e:201 util.sysexit_with_message(str(e))202 # Validate requirements file203 try:204 model.scheme.validate(205 type=type,206 data=data207 )208 except exceptions.ValidationError as e:209 util.sysexit_with_message(210 "Parsing failed for {file} due to {errors}".format(211 file=path,212 errors=e.errors213 )214 )215 # Return parsed and validated data...
login.py
Source:login.py
...64 """65 c = self._config66 if (not c.state.created) and c.driver.managed:67 msg = "Instances not created. Please create instances first."68 util.sysexit_with_message(msg)69 hosts = [d["name"] for d in self._config.platforms.instances]70 hostname = self._get_hostname(hosts)71 self._get_login(hostname)72 def _get_hostname(self, hosts):73 hostname = self._config.command_args.get("host")74 host_list = "\n".join(sorted(hosts))75 if hostname is None:76 if len(hosts) == 1:77 hostname = hosts[0]78 else:79 msg = (80 f"There are {len(hosts)} running hosts. Please specify "81 "which with --host.\n\n"82 f"Available hosts:\n{host_list}"83 )84 util.sysexit_with_message(msg)85 match = [x for x in hosts if x.startswith(hostname)]86 if len(match) == 0:87 msg = (88 f"There are no hosts that match '{hostname}'. You "89 "can only login to valid hosts."90 )91 util.sysexit_with_message(msg)92 elif len(match) != 1:93 # If there are multiple matches, but one of them is an exact string94 # match, assume this is the one they're looking for and use it.95 if hostname in match:96 match = [hostname]97 else:98 msg = (99 f"There are {len(match)} hosts that match '{hostname}'. You "100 "can only login to one at a time.\n\n"101 f"Available hosts:\n{host_list}"102 )103 util.sysexit_with_message(msg)104 return match[0]105 def _get_login(self, hostname): # pragma: no cover106 lines, columns = os.popen("stty size", "r").read().split()107 login_options = self._config.driver.login_options(hostname)108 login_options["columns"] = columns109 login_options["lines"] = lines110 login_cmd = self._config.driver.login_cmd_template.format(**login_options)111 cmd = shlex.split(f"/usr/bin/env {login_cmd}")112 subprocess.run(cmd)113@base.click_command_ex()114@click.pass_context115@click.option("--host", "-h", help="Host to access.")116@click.option(117 "--scenario-name",...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!