Best Python code snippet using autotest_python
scm.py
Source:scm.py
...62def set_git_config(name,email):63 cfg = git.GitConfigParser([os.path.normpath(os.path.expanduser("~/.gitconfig"))], read_only=False)64 cfg.set_value('user','name',name)65 cfg.set_value('user','email',email)66def repo_check(repo, require_remote=False):67 if repo is None:68 raise TypeError('Not a git repository.')69 # TODO: no remote fail70 if not repo.remotes and require_remote:71 raise ValueError('No git remotes configured. Please add one.')72 # TODO: You're in a merge state.73 return repo74def get_current_branch_name(repo):75 """Returns current branch name"""76 repo_check(repo)77 return repo.head.ref.name78def clone_from(repourl, targetdir):79 try:80 return _repo.clone_from(repourl, targetdir)81 except (GitCommandError,) as e:82 raise CloneFailure('Cloning {0} failed'.format(repourl),e)83def Repo(path):84 """Returns the current Repo, based on path."""85 try:86 return _repo(path, search_parent_directories=True)87 except InvalidGitRepositoryError:88 pass89# def get_remote(repo):90# repo_check(repo, require_remote=True)91# reader = repo.config_reader()92# # If there is no remote option in legit section, return default93# if not reader.has_option('legit', 'remote'):94# return repo.remotes[0]95# remote_name = reader.get('legit', 'remote')96# if not remote_name in [r.name for r in repo.remotes]:97# raise ValueError('Remote "{0}" does not exist! Please update your git '98# 'configuration.'.format(remote_name))99# return repo.remote(remote_name)100def branch_name(repo):101 """Gets current branch name"""102 repo_check(repo)103 return repo.head.ref.name104def branches(repo, local=True, remote=True, excl=forbidden_branches):105 """Returns a list of local and remote branches."""106 repo_check(repo)107 r = [n for n in map(del_remote,map(gname,repo.remote().refs)) if n not in excl] if remote and repo.remotes else []108 l = [n for n in map(gname,repo.heads) if n not in excl] if local else []109 return [Branch(n, is_published=n in r, is_local=n in l) for n in sorted(set(r+l))]110def branch_names(repo, local=True, remote=True, excl=forbidden_branches):111 """Returns a list of local and remote branch names."""112 return [b.name for b in branches(repo, local=local, remote=remote, excl=excl)]113def get_branch(repo, branch=None, local=True, remote=True, excl=forbidden_branches):114 """Returns a list of local and remote branches."""115 branch = branch_name(repo) if branch is None else branch116 return {b.name:b for b in branches(repo, local=local, remote=remote, excl=excl)}.get(branch)117def get_branch_state(b):118 return 'tracked' if b.is_published and b.is_local else 'local' if b.is_local else 'remote'119def get_branch_items(branches,branch=None):120 """Get quick panel items from a list of branches"""121 return [[[' ','* '][branch==b.name]+b.name,' '+get_branch_state(b)] for b in branches]122class RepoHelper(object):123 @property124 def repo(self):125 """discover the repo for a sublime command"""126 if hasattr(self, 'view'):127 window = self.view.window()128 view = self.view129 elif hasattr(self, 'window'):130 window = self.window131 view = self.window.active_view()132 else:133 return134 folders = []135 view_repo = view.settings().get('git_repo')136 if view_repo:137 folders.append(view_repo)138 cur_file = view.file_name()139 if cur_file:140 folders.append(os.path.dirname(cur_file))141 142 folders.extend(window.folders())143 for folder in folders:144 if os.path.exists(folder):145 repo = Repo(folder)146 if repo:147 return repo148# def get_branches(repo, local=True, remote_branches=True):149# """Returns a list of local and remote branches."""150# repo_check(repo)151# # print local152# branches = []153# if remote_branches:154# # Remote refs.155# try:156# for b in repo.remote('origin').refs:157# name = b.name.split('/',1)[-1]158# if name not in forbidden_branches:159# branches.append(Branch(name, is_published=True))160# except (IndexError, AssertionError):161# pass162# if local:163# # Local refs.164# for b in [h.name for h in repo.heads]:165# if b not in [br.name for br in branches] or not remote_branches:166# if b not in forbidden_branches:167# branches.append(Branch(b, is_published=False))168# return sorted(branches, key=attrgetter('name'))169# def get_branch_names(repo, local=True, remote_branches=True):170# repo_check(repo)171# branches = get_branches(repo, local=local, remote_branches=remote_branches)172# return [b.name for b in branches]173def stash_it(repo, sync=False):174 repo_check(repo)175 msg = 'syncing branch' if sync else 'switching branches'176 return repo.git.stash('save', '--include-untracked', STASH_TEMPLATE.format(msg))177def unstash_index(repo, sync=False, branch=None):178 """Returns an unstash index if one is available."""179 repo_check(repo)180 stash_list = repo.git.stash('list')181 branch = branch_name(repo) if branch is None else branch182 intent = 'syncing branch' if sync else 'switching branches'183 legit_msg = STASH_TEMPLATE.format(intent)184 # stash_name_re = "^stash@{([0-9]+)}: On "+branch+": Legit: stashing before "+intent+"\.$"185 for stash in stash_list.splitlines():186 stash_id, on_branch, msg = stash.split(': ',2)187 # stash_match = re.match(stash_name_re, stash)188 if on_branch[3:]==branch and msg==legit_msg:189 return stash_id[7:-1]190 # return stash_match.groups()[0]191def unstash_it(repo, sync=False, branch=None):192 """Unstashes changes from current branch for branch sync."""193 repo_check(repo)194 stash_index = unstash_index(repo, sync=sync, branch=branch)195 if stash_index is not None:196 return repo.git.stash('pop', 'stash@{{{0}}}'.format(stash_index))197def fetch(repo):198 repo_check(repo)199 return repo.git.fetch('origin')200def is_empty(repo):201 """Check to see if a repo is empty"""202 203 repo_check(repo)204 return not (repo.head.is_valid() or any(repo.index.iter_blobs()))205def is_upstream_ahead(repo,branch=None):206 repo_check(repo, require_remote=True)207 branch = branch_name(repo) if branch is None else branch208 return any(repo.iter_commits(branch+'..'+branch+'@{u}'))209def merged(repo, branch, on_src=False):210 '''Checks to see if a branch is merged'''211 repo_check(repo)212 src, dst = ('HEAD^{}',branch+'^{}') if on_src else (branch+'^{}','HEAD^{}')213 return (repo.merge_base(src,dst) or [None])[0] == repo.rev_parse(src)214def smart_merge(repo, branch, allow_rebase=True, force_theirs=None):215 repo_check(repo)216 from_branch = branch_name(repo)217 merges = repo.git.log('--merges', '{0}..{1}'.format(branch, from_branch))218 if allow_rebase:219 verb = 'merge' if merges.count('commit') else 'rebase'220 else:221 verb = 'merge'222 try:223 if verb == 'merge' and force_theirs is not None:224 if force_theirs:225 # Overwrite our conflicting differences with theirs226 if not allow_rebase:227 return repo.git.merge(branch,no_ff=True,X='theirs')228 return getattr(repo.git, verb)(branch,X='theirs')229 else:230 # Discard their conflicting changes being merged in231 if not allow_rebase:232 return repo.git.merge(branch,no_ff=True,X='ours')233 return getattr(repo.git, verb)(branch,X='ours')234 elif not allow_rebase:235 return repo.git.merge(branch,no_ff=True)236 else:237 # Merge and abort if there are conflicts238 return getattr(repo.git, verb)(branch)239 except GitCommandError as why:240 log = getattr(repo.git, verb)('--abort')241 abort('Merge failed. Reverting.', log='{0}\n{1}'.format(why, log), type='merge')242 raise why243def smart_pull(repo):244 'git log --merges origin/master..master'245 repo_check(repo)246 branch = branch_name(repo)247 fetch(repo)248 return smart_merge(repo, '{0}/{1}'.format('origin', branch))249def commit(repo, message, *files, all_files=True, untracked_files=True):250 repo_check(repo)251 if repo.is_dirty() or repo.untracked_files:252 if all_files:253 if untracked_files:254 repo.git.add(all=True)255 else:256 repo.git.add(update=True)257 elif files:258 repo.index.add(files)259 else:260 return261 # Check that something is to be committed262 if repo.head.is_valid():263 if not repo.index.diff("HEAD"):264 return265 else:266 if not list(repo.index.iter_blobs()):267 return268 return repo.index.commit(message)269def undo(repo):270 repo_check(repo)271 repo.head.reset('HEAD^')272def is_upstream_behind(repo,branch=None):273 repo_check(repo, require_remote=True)274 branch = branch_name(repo) if branch is None else branch275 return any(repo.iter_commits(branch+'@{u}..'+branch))276def push(repo, branch=None):277 repo_check(repo, require_remote=True)278 branch = branch_name(repo) if branch is None else branch279 if branch in repo.heads:280 try:281 repo.remotes.origin.push('{0}:{0}'.format(branch,branch))282 except GitCommandError:283 raise PushFailure("")284 if not repo.heads[branch].tracking_branch():285 repo.heads[branch].set_tracking_branch(repo.remotes.origin.refs[branch])286def branch_on_remote(repo, branch):287 # Determine if a branch is in a remote repository288 repo_check(repo, require_remote=True)289 try:290 return repo.git.ls_remote('origin', branch,heads=True)291 except GitCommandError:292 raise RemoteFailure('Cannot contact remote repository',next(repo.remotes.origin.urls))293def checkout_branch(repo, branch):294 """Checks out given branch."""295 repo_check(repo)296 repo.heads[branch].checkout()297def track_branch(repo, branch):298 "Creates a new branch that tracks a remote branch"299 repo_check(repo, require_remote=True)300 remote = repo.remotes.origin.refs[branch]301 return repo.create_head(branch, remote).set_tracking_branch(remote)302def sprout_branch(repo, branch, off_branch=None):303 """Creates branch from current unless provided."""304 repo_check(repo)305 off_branch = branch_name(repo) if off_branch is None else off_branch306 return repo.create_head(branch, repo.heads[off_branch])307def destroy_branch(repo, branch):308 '''Delete a branch both locally and remotely'''309 repo_check(repo)310 head = get_branch(repo, branch)311 if head is None:312 return313 if head.name == branch_name(repo):314 return315 index = unstash_index(repo, branch=head.name)316 if index:317 repo.git.stash('drop', index)318 if head.is_published:319 repo.git.push('origin',head.name,delete=True)320 if head.is_local:321 repo.git.branch(head.name,D=True)322# def graft_branch(repo, branch):323# """Merges branch into current branch, and deletes it."""324# repo_check(repo)325# log = []326# try:327# msg = repo.git.merge('--no-ff', branch)328# log.append(msg)329# except GitCommandError as why:330# log = repo.git.merge('--abort')331# abort('Merge failed. Reverting.', log='{0}\n{1}'.format(why, log), type='merge')332# out = repo.git.branch('-D', branch)333# log.append(out)334# return '\n'.join(log)335# def unpublish_branch(repo, branch):336# """Unpublishes given branch."""337# repo_check(repo)338# try:339# return repo.git.push('origin', ':{0}'.format(branch))340# except GitCommandError:341# _, _, log = repo.git.fetch('origin', '--prune', with_extended_output=True)342# abort('Unpublish failed. Fetching.', log=log, type='unpublish')343# def publish_branch(repo, branch):344# """Publishes given branch."""345# repo_check(repo)...
test_git_commit_one_file.py
Source:test_git_commit_one_file.py
1# -*- coding: utf-8 -*-2import os3from vilya.models.project import CodeDoubanProject4from vilya.models import git5from tests.base import TestCase6from tests.utils import mkdtemp7from vilya.libs import gyt8from vilya.libs.permdir import get_repo_root9class TestGit(TestCase):10 @property11 def u(self):12 return self.addUser()13 def _path(self, name):14 return os.path.join(get_repo_root(), '%s.git' % name)15 def _path_work_tree(self, name):16 return os.path.join(get_repo_root(), '%s.work_tree' % name)17 def _repo(self, name, bare=True):18 git_path = self._path(name)19 if bare:20 work_tree_path = None21 else:22 work_tree_path = self._path_work_tree(name)23 if not os.path.exists(work_tree_path):24 os.mkdir(work_tree_path)25 try:26 CodeDoubanProject.create_git_repo(git_path)27 except:28 pass29 repo = git.GitRepo(git_path, work_tree=work_tree_path)30 return repo31 def _commit(self, repo, filename, content='testcontent',32 message='testmessage'):33 # TODO allow commiting more than one file34 assert os.path.exists(repo.work_tree), \35 "repo.work_tree must exist, check if repo has been created with bare=False" # noqa36 path = os.path.join(repo.work_tree, filename)37 dir_ = os.path.dirname(path)38 if not os.path.exists(dir_):39 os.makedirs(os.path.dirname(path))40 f = open(path, 'w')41 f.write(content)42 f.close()43 rep2 = gyt.repo(repo.path, repo.work_tree, bare=False)44 rep2.call(['add', filename])45 rep2.call(['commit', filename, '-m', message], _env=self.env_for_git)46 return gyt.repo(repo.path).sha()47 def test_simple_commit(self):48 repo = self._repo('test', bare=False)49 self._commit(repo, 'testfile1', 'content1', 'msg1')50 src = repo.get_src('testfile1')51 assert src == ('blob', u'content1')52 repo.commit_one_file('testfile1', 'content1 modified',53 'change1', self.u, orig_hash=hash('content1'))54 src = repo.get_src('testfile1')55 assert src == ('blob', u'content1 modified')56 def test_simple_commit_do_not_delete_other_files(self):57 repo = self._repo('test', bare=False)58 self._commit(repo, 'testfile1', 'content1', 'msg1')59 self._commit(repo, 'testfile2', 'content2', 'msg2')60 repo.commit_one_file('testfile1', 'content1 modified',61 'change1', self.u, orig_hash=hash('content1'))62 src = repo.get_src('testfile1')63 assert src == ('blob', u'content1 modified')64 type_, files = repo.get_src('')65 assert any(d['path'] == 'testfile2' for d in files), \66 "testfile2 should exists in root tree"67 src = repo.get_src('testfile2')68 assert src == ('blob', u'content2')69 def test_commit_in_inner_directory(self):70 repo = self._repo('test', bare=False)71 self._commit(repo, 'test/file1', 'content1', 'msg1')72 src = repo.get_src('test/file1')73 assert src == ('blob', u'content1')74 repo.commit_one_file('test/file1', 'content1 modified',75 'change1', self.u, orig_hash=hash('content1'))76 src = repo.get_src('test/file1')77 assert src == ('blob', u'content1 modified')78 def test_create_file(self):79 repo = self._repo('test', bare=False)80 self._commit(repo, 'file1', 'content1', 'msg1')81 repo.commit_one_file(82 'file2', 'content2 created', 'create1', self.u)83 assert repo.cat('HEAD:file1') == 'content1'84 assert repo.cat('HEAD:file2') == 'content2 created'85 def test_create_first_file(self):86 repo = self._repo('test', bare=False)87 repo.commit_one_file(88 'file1', 'content1 created', 'create1', self.u)89 assert repo.cat('HEAD:file1') == 'content1 created'90 def test_create_first_file_and_more(self):91 repo = self._repo('test', bare=False)92 repo.commit_one_file(93 'file1', 'content1 created', 'create1', self.u)94 repo.commit_one_file(95 'file2', 'content2 created', 'create2', self.u)96 repo.commit_one_file(97 'file3', 'content3 created', 'create3', self.u)98 repo.commit_one_file(99 'file4', 'content4 created', 'create4', self.u)100 assert repo.cat('HEAD:file1') == 'content1 created'101 assert repo.cat('HEAD:file2') == 'content2 created'102 assert repo.cat('HEAD:file3') == 'content3 created'103 assert repo.cat('HEAD:file4') == 'content4 created'104 def test_commit_file_on_dirty_index(self):105 repo = self._repo('test', bare=False)106 repo.commit_one_file(107 'file1', 'content1 created', 'create1', self.u)108 repo.commit_one_file(109 'file2', 'content2 created', 'create2', self.u)110 repo.commit_one_file(111 'file1', 'content1 modified', 'modify1', self.u)112 # Now artificially rewind the index tree state113 repo.call('read-tree HEAD^')114 repo.commit_one_file(115 'file2', 'content2 modified', 'modify2', self.u)116 # the latest commit should not have anything related to file1117 assert 'file1' not in repo.call('log -p -n1')118 def test_create_file_in_dir(self):119 repo = self._repo('test', bare=False)120 self._commit(repo, 'test/file1', 'content1', 'msg1')121 repo.commit_one_file(122 'test/file2', 'content2 created', 'create1', self.u)123 assert repo.cat('HEAD:test/file1') == 'content1'124 assert repo.cat('HEAD:test/file2') == 'content2 created'125 def test_simple_commit_in_branch(self):126 repo = self._repo('test', bare=False)127 self._commit(repo, 'testfile1', 'content1', 'msg1')128 tmp_branch = repo.temp_branch_name()129 repo.commit_one_file('testfile1', 'content1 modified', 'change1',130 self.u, orig_hash=hash('content1'),131 branch=tmp_branch)132 with mkdtemp() as tmpdir:133 gyt.call(['git', 'clone', repo.path, tmpdir])134 repo_check = gyt.repo(tmpdir, bare=False)135 src = repo_check.call('show HEAD:testfile1')136 assert src == u'content1'137 repo_check.call('checkout master')138 src = repo_check.call('show HEAD:testfile1')139 assert src == u'content1'140 repo_check.call('checkout %s' % tmp_branch)141 src = repo_check.call('show HEAD:testfile1')142 assert src == u'content1 modified'143 repo_check.call('checkout master')144 src = repo_check.call('show HEAD:testfile1')145 assert src == u'content1'146 def test_simple_commit_in_branch_in_subdir(self):147 repo = self._repo('test', bare=False)148 self._commit(repo, 'test/file1', 'content1', 'msg1')149 tmp_branch = repo.temp_branch_name()150 repo.commit_one_file('test/file1', 'content1 modified', 'change1',151 self.u, orig_hash=hash('content1'),152 branch=tmp_branch)153 with mkdtemp() as tmpdir:154 gyt.call(['git', 'clone', repo.path, tmpdir])155 repo_check = gyt.repo(tmpdir, bare=False)156 src = repo_check.call('show HEAD:test/file1')157 assert src == u'content1'158 repo_check.call('checkout master')159 src = repo_check.call('show HEAD:test/file1')160 assert src == u'content1'161 repo_check.call('checkout %s' % tmp_branch)162 src = repo_check.call('show HEAD:test/file1')163 assert src == u'content1 modified'164 repo_check.call('checkout master')165 src = repo_check.call('show HEAD:test/file1')166 assert src == u'content1'167 def test_simple_commit_in_branch_creates_branch(self):168 repo = self._repo('test', bare=False)169 self._commit(repo, 'testfile1', 'content1', 'msg1')170 assert repo.get_branches() == ['master']171 tmp_branch = repo.temp_branch_name()172 repo.commit_one_file('testfile1', 'content1 modified', 'change1',173 self.u, orig_hash=hash('content1'),174 branch=tmp_branch)175 assert repo.get_branches() == ['master', tmp_branch]176 def test_simple_commit_in_branch_and_delete_branch(self):177 repo = self._repo('test', bare=False)178 self._commit(repo, 'testfile1', 'content1', 'msg1')179 tmp_branch = repo.temp_branch_name()180 repo.commit_one_file('testfile1', 'content1 modified', 'change1',181 self.u, orig_hash=hash('content1'),182 branch=tmp_branch)183 assert tmp_branch in repo.get_branches()184 repo.remove_temp_branch(tmp_branch)185 assert tmp_branch not in repo.get_branches()186 assert repo.get_branches() == ['master']187 def test_simple_commit_in_another_branch(self):188 repo = self._repo('test', bare=False)189 self._commit(repo, 'testfile1', 'content1', 'msg1')190 branch = 'mybranch'191 repo.commit_one_file('testfile1', 'content1 modified', 'change1',192 self.u, orig_hash=hash('content1'), branch=branch)193 assert branch in repo.get_branches()...
CollectMapping.py
Source:CollectMapping.py
1from NoteBookMapping import mapping,group2from CheckRepo import RepoCheck3from RepoNotebooks import RepoNotebook4from Data import TEST_REPO,REPOS5from CollectNotebook import name6from Config import CURRENT_FILE,Notebook,CellMapping,SPLIT,SAVE_FOLDER7mapping_name = lambda x,y,index : f'{index}{SPLIT}{x}{SPLIT}{y}.txt'8class CollectMapping:9 def __init__(self,repo_check:RepoCheck):10 self.repo_check = repo_check11 def saveMapping(self,output,old_path,new_path,index):12 name = lambda x: x.split("/")[-1].split(".")[0][:63]13 mname = mapping_name(name(old_path),name(new_path),index)14 with open(f'{CURRENT_FILE}/mapping_cache/{mname}','w') as file:15 for i in output:16 if type(i[1]) != list:17 file.write(f'{i[0]},{i[1]}\n')18 else:19 if len(i[1]) == 1:20 file.write(f'{i[0]},{i[1][0]}m\n')21 else:22 file.write(f'{i[0]},{",".join([str(j) for j in i[1]])}\n')23 def getMapping(self,version):24 i,j = 0,125 index = 026 while j < len(version):27 old_path = f'{self.repo_check.notebook_cache_path}/{name(version[i][1],version[i][0])}'28 new_path = f'{self.repo_check.notebook_cache_path}/{name(version[j][1],version[j][0])}'29 old,new = Notebook(old_path),Notebook(new_path)30 output = CellMapping(old,new)31 self.saveMapping(output,old_path,new_path,index)32 i += 133 j += 134 index += 135 def run(self):36 _,_,output = group(self.repo_check.data())37 for i in output:38 self.getMapping(i)39 40if __name__ == "__main__":41 rpn = RepoCheck('frnsys#ai_notes')42 cm = CollectMapping(rpn)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!