Best Python code snippet using ATX
prompt.py
Source:prompt.py
...65 text = ' '.join('{%s:%s%s}' % (i, align.get(i, ''), lengths[i]) for i in columns)66 for item in objects:67 print(text.format(**item))68 return subdirs69 def _fix_path(self, path, local=False, required=False):70 path = '' if path is None else path.strip()71 rval = []72 if not path and required:73 raise WebHDFSError('%s: path not specified' % required)74 if not path:75 path = getattr(self, 'path', '/user/%s' % self.user) if not local else os.getcwd()76 if not path.startswith('/'):77 path = '%s/%s' % (self.path if not local else os.getcwd(), path)78 for part in path.split('/'):79 if not part or part == '.':80 continue81 if rval and part == '..':82 rval.pop()83 else:84 rval.append(part)85 return '/'+'/'.join(rval)86 def _print_usage(self):87 print(getattr(self, inspect.stack()[1][3]).__doc__.strip().split('\n')[0])88 def _reset_prompt(self):89 self.prompt = '%s@%s r:%s l:%s> ' % (self.user, self.base.netloc, self.path, os.getcwd())90 def _complete_local(self, part, kind):91 path = self._fix_path(part, local=True)92 name = ''93 if part and not part.endswith('/'):94 name = os.path.basename(path)95 path = os.path.dirname(path)96 if kind == 'file':97 pick = lambda x: x.startswith(name) and not stat.S_ISDIR(os.stat('%s/%s' % (path, x)).st_mode)98 elif kind == 'dir':99 pick = lambda x: x.startswith(name) and stat.S_ISDIR(os.stat('%s/%s' % (path, x)).st_mode)100 else:101 pick = lambda x: x.startswith(name)102 return [i + ('/' if stat.S_ISDIR(os.stat('%s/%s' % (path, i)).st_mode) else ' ') for i in os.listdir(path) if pick(i)]103 def _complete_remote(self, part, kind):104 path = self._fix_path(part)105 name = ''106 if part and not part.endswith('/') and not part.endswith('/..'):107 name = os.path.basename(path)108 path = os.path.dirname(path)109 if kind == 'file':110 pick = lambda x: x.name.startswith(name) and not x.is_dir()111 elif kind == 'dir':112 pick = lambda x: x.name.startswith(name) and x.is_dir()113 else:114 pick = lambda x: x.name.startswith(name)115 return [i.name + ('/' if i.is_dir() and not i.is_empty() else ' ') for i in self.hdfs.ls(path, request=pick)]116 def _complete_du(self, part, cache=[]):117 if not cache:118 cache.extend(['dirs', 'files', 'hdfs_usage', 'disk_usage', 'hdfs_quota', 'disk_usage'])119 rval = [i for i in cache if i.startswith(part)]120 return rval if len(rval) != 1 else [rval[0] + ' ']121 def _complete_chown(self, part, cache={}):122 if ':' not in part:123 if 'pwd' not in cache:124 cache['pwd'] = pwd.getpwall()125 rval = [i.pw_name for i in cache['pwd'] if i.pw_name.startswith(part)]126 return rval if len(rval) != 1 else [rval[0] + ':']127 else:128 if 'grp' not in cache:129 cache['grp'] = grp.getgrall()130 rval = [i.gr_name for i in cache['grp'] if i.gr_name.startswith(part.split(':', 1)[-1])]131 return rval if len(rval) != 1 else [rval[0] + ' ']132 def _complete_chmod(self, part):133 mode = int(part, 8) if part else 0134 if len(part) < 4 and (mode << 3) < 0o777:135 return [oct((mode << 3) + i) for i in range(1, 8)]136 else:137 return [oct(mode) + ' ']138 def completedefault(self, part, line, s, e):139 if part == '.' or part == '..':140 return [part + '/']141 args = shlex.split(line[:e])142 if len(args) == 1 or line[e - 1] == ' ':143 args.append('')144 # Extract completion magic from method documentation145 docs = getattr(getattr(self, 'do_'+args[0], object), '__doc__')146 rule = re.findall(r'(?:[<\[](.+?)[>\]])+', docs)[len(args) - 2]147 if re.search(r'(?:local|remote) (?:file/dir|file|dir)', rule):148 kind, dest = rule.split()149 return getattr(self, '_complete_'+kind)(args[-1], dest)150 if re.search(r'\w+ options', rule):151 return getattr(self, '_complete_'+rule.split()[0])(args[-1])152 def emptyline(self):153 pass154 def default(self, arg):155 print('%s: unknown command' % arg)156 def do_cd(self, path=None):157 '''158 Usage: cd <remote dir>159 Changes the shell remote directory160 '''161 try:162 path = self._fix_path(path or '/user/%s' % self.user)163 if not self.hdfs.stat(path).is_dir():164 raise WebHDFSError('%s: not a directory' % path)165 self.path = path166 except WebHDFSError as e:167 self.path = '/'168 print(e)169 finally:170 self._reset_prompt()171 def do_lcd(self, path=None):172 '''173 Usage: lcd <local dir>174 Changes the shell local directory175 '''176 try:177 path = self._fix_path(path or pwd.getpwnam(self.user).pw_dir, local=True)178 os.chdir(path)179 except (KeyError, OSError) as e:180 print(e)181 finally:182 self._reset_prompt()183 def do_ls(self, path=None):184 '''185 Usage: ls <remote file/dir>186 Lists remote file or directory187 '''188 try:189 path = self._fix_path(path)190 self._list_dir(self.hdfs.ls(path))191 except WebHDFSError as e:192 print(e)193 def do_lsr(self, path=None):194 '''195 Usage: ls <remote file/dir>196 Lists remote file or directory recursively197 '''198 try:199 path = self._fix_path(path)200 print(path + ':')201 for name in self._list_dir(self.hdfs.ls(path)):202 print()203 self.do_lsr('%s/%s' % (path, name))204 except WebHDFSError as e:205 print(e)206 def do_glob(self, path=None):207 '''208 Usage: glob <remote file/dir>209 Lists remote file or directory pattern210 '''211 try:212 path = self._fix_path(path, required='glob')213 self._list_dir(self.hdfs.glob(path))214 except WebHDFSError as e:215 print(e)216 def do_lls(self, path=None):217 '''218 Usage: lls <local file/dir>219 Lists local file or directory220 '''221 try:222 path = self._fix_path(path, local=True)223 info = os.stat(path)224 objs = []225 if stat.S_ISDIR(info.st_mode):226 objs = list(LocalFSObject(path, name) for name in os.listdir(path))227 elif stat.S_ISREG(info.st_mode):228 objs = [LocalFSObject(os.path.dirname(path), os.path.basename(path))]229 self._list_dir(objs)230 except OSError as e:231 print(e)232 def do_du(self, args=''):233 '''234 Usage: du <remote file/dir> [du options]235 Options: dirs|files|hdfs_usage|disk_usage|hdfs_quota|disk_quota236 Displays usage for remote file or directory237 '''238 try:239 args = shlex.split(args)240 if len(args) > 2:241 return self._print_usage()242 path = self._fix_path(args[0] if len(args) > 0 else None)243 print(self.hdfs.du(path, args[1] if len(args) == 2 else 'hdfs_usage'))244 except WebHDFSError as e:245 print(e)246 def do_mkdir(self, path):247 '''248 Usage: mkdir <remote dir>249 Creates remote directory250 '''251 try:252 path = self._fix_path(path, required='mkdir')253 if self.hdfs.stat(path, catch=True):254 raise WebHDFSError('%s: already exists' % path)255 self.hdfs.mkdir(path)256 except WebHDFSError as e:257 print(e)258 def do_mv(self, args):259 '''260 Usage: mv <remote file/dir> <remote dir>261 Moves/renames remote file or directory262 '''263 try:264 path, dest = shlex.split(args)265 path = self._fix_path(path, required='mv')266 dest = self._fix_path(dest, required='mv')267 stat = self.hdfs.stat(dest, catch=True) or self.hdfs.stat(os.path.dirname(dest), catch=True)268 if stat and not stat.is_dir():269 raise WebHDFSError('%s: invalid destination' % dest)270 if not self.hdfs.mv(path, dest):271 raise WebHDFSError('%s: failed to move/rename' % path)272 except WebHDFSError as e:273 print(e)274 except ValueError as e:275 self._print_usage()276 def do_rm(self, path):277 '''278 Usage: rm <remote file>279 Removes remote file280 '''281 try:282 path = self._fix_path(path, required='rm')283 if self.hdfs.stat(path).is_dir():284 raise WebHDFSError('%s: cannot remove directory' % path)285 self.hdfs.rm(path)286 except WebHDFSError as e:287 print(e)288 def do_rmdir(self, path):289 '''290 Usage: rm <remote dir>291 Removes remote directory292 '''293 try:294 path = self._fix_path(path, required='rmdir')295 temp = self.hdfs.stat(path)296 if not temp.is_dir():297 raise WebHDFSError('%s: not a directory' % path)298 if not temp.is_empty():299 raise WebHDFSError('%s: directory not empty' % path)300 self.hdfs.rm(path)301 except WebHDFSError as e:302 print(e)303 def do_chown(self, args):304 '''305 Usage: chown <chown options> <remote file/dir>306 Options: [owner][:group]307 Change ownership of remote file or directory308 '''309 try:310 dest, path = shlex.split(args)311 path = self._fix_path(path, required='chown')312 o, g = dest.split(':', 1) if ':' in dest else (dest, '')313 self.hdfs.chown(path, owner=o, group=g)314 except WebHDFSError as e:315 print(e)316 except ValueError:317 self._print_usage()318 def do_chmod(self, args):319 '''320 Usage: chmod <chmod options> <remote file/dir>321 Options: octal mode: 0000 - 0777322 Change permission on remote file or directory323 '''324 try:325 perm, path = shlex.split(args)326 path = self._fix_path(path, required='chmod')327 self.hdfs.chmod(path, perm)328 except WebHDFSError as e:329 print(e)330 except ValueError:331 self._print_usage()332 def do_touch(self, args):333 '''334 Usage touch <remote file> [epoch time]335 Change modification time on remote file, optionally creating it336 '''337 try:338 args = shlex.split(args)339 if len(args) > 2:340 return self._print_usage()341 path = self._fix_path(args[0])342 time = None343 try:344 time = int(args[1])345 except Exception as e:346 if not isinstance(e, (ValueError, IndexError)):347 self._print_usage()348 self.hdfs.touch(path, time)349 except WebHDFSError as e:350 print(e)351 def do_get(self, path):352 '''353 Usage: get <remote file>354 Fetch remote file into current local directory355 '''356 try:357 path = self._fix_path(path, required='get')358 if self.hdfs.stat(path).is_dir():359 raise WebHDFSError('%s: cannot download directory' % path)360 if os.path.exists(os.path.basename(path)):361 raise WebHDFSError('%s: file exists' % path)362 self.hdfs.get(path, data=open('%s/%s' % (os.getcwd(), os.path.basename(path)), 'wb'))363 except (WebHDFSError, OSError) as e:364 print(e)365 def do_put(self, path):366 '''367 Usage: put <local file>368 Upload local file into current remote directory369 '''370 try:371 path = self._fix_path(path, local=True, required='put')372 dest = '%s/%s' % (self.path, os.path.basename(path))373 if stat.S_ISDIR(os.stat(path).st_mode):374 raise WebHDFSError('%s: cannot upload directory' % path)375 if self.hdfs.stat(dest, catch=True):376 raise WebHDFSError('%s: already exists' % dest)377 self.hdfs.put(dest, data=open(path, 'rb'))378 except (WebHDFSError, OSError) as e:379 print(e)380 def do_cat(self, path):381 '''382 Usage: cat <remote file>383 Display contents of remote file384 '''385 try:386 path = self._fix_path(path, required='cat')387 if self.hdfs.stat(path).is_dir():388 raise WebHDFSError('%s: cannot cat directory' % path)389 sys.stdout.write(self.hdfs.get(path))390 except (WebHDFSError, OSError) as e:391 print(e)392 def do_zcat(self, path):393 '''394 Usage: zcat <remote file>395 Display contents of compressed remote file396 '''397 try:398 path = self._fix_path(path, required='zcat')399 if self.hdfs.stat(path).is_dir():400 raise WebHDFSError('%s: cannot cat directory' % path)401 sys.stdout.write(zlib.decompress(self.hdfs.get(path), 16 + zlib.MAX_WBITS))402 except (WebHDFSError, OSError) as e:403 print(e)404 def do_EOF(self, line):405 print()406 return True407for name, func in vars(WebHDFSPrompt).items():408 if name.startswith('do_') and getattr(func, '__doc__'):...
__init__.py
Source:__init__.py
...56from util import TahoeUtil57from connection import Connection58from six import b59logger = fs.getLogger('fs.tahoelafs')60def _fix_path(func):61 """Method decorator for automatically normalising paths."""62 def wrapper(self, *args, **kwds):63 if len(args):64 args = list(args)65 args[0] = _fixpath(args[0])66 return func(self, *args, **kwds)67 return wrapper68def _fixpath(path):69 """Normalize the given path."""70 return abspath(normpath(path))71class _TahoeLAFS(FS):72 """FS providing raw access to a Tahoe-LAFS Filesystem.73 This class implements all the details of interacting with a Tahoe-backed74 filesystem, but you probably don't want to use it in practice. Use the...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!