Best Python code snippet using slash
loadwsgi.py
Source:loadwsgi.py
1# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)2# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php3from __future__ import with_statement4import os5import sys6import re7import pkg_resources8from paste.deploy.compat import ConfigParser, unquote, iteritems, dictkeys9from paste.deploy.util import fix_call, lookup_object10__all__ = ['loadapp', 'loadserver', 'loadfilter', 'appconfig']11############################################################12## Utility functions13############################################################14def import_string(s):15 return pkg_resources.EntryPoint.parse("x=" + s).load(False)16def _aslist(obj):17 """18 Turn object into a list; lists and tuples are left as-is, None19 becomes [], and everything else turns into a one-element list.20 """21 if obj is None:22 return []23 elif isinstance(obj, (list, tuple)):24 return obj25 else:26 return [obj]27def _flatten(lst):28 """29 Flatten a nested list.30 """31 if not isinstance(lst, (list, tuple)):32 return [lst]33 result = []34 for item in lst:35 result.extend(_flatten(item))36 return result37class NicerConfigParser(ConfigParser):38 def __init__(self, filename, *args, **kw):39 ConfigParser.__init__(self, *args, **kw)40 self.filename = filename41 if hasattr(self, '_interpolation'):42 self._interpolation = self.InterpolateWrapper(self._interpolation)43 read_file = getattr(ConfigParser, 'read_file', ConfigParser.readfp)44 def defaults(self):45 """Return the defaults, with their values interpolated (with the46 defaults dict itself)47 Mainly to support defaults using values such as %(here)s48 """49 defaults = ConfigParser.defaults(self).copy()50 for key, val in iteritems(defaults):51 defaults[key] = self.get('DEFAULT', key) or val52 return defaults53 def _interpolate(self, section, option, rawval, vars):54 # Python < 3.255 try:56 return ConfigParser._interpolate(57 self, section, option, rawval, vars)58 except Exception:59 e = sys.exc_info()[1]60 args = list(e.args)61 args[0] = 'Error in file %s: %s' % (self.filename, e)62 e.args = tuple(args)63 e.message = args[0]64 raise65 class InterpolateWrapper(object):66 # Python >= 3.267 def __init__(self, original):68 self._original = original69 def __getattr__(self, name):70 return getattr(self._original, name)71 def before_get(self, parser, section, option, value, defaults):72 try:73 return self._original.before_get(parser, section, option,74 value, defaults)75 except Exception:76 e = sys.exc_info()[1]77 args = list(e.args)78 args[0] = 'Error in file %s: %s' % (parser.filename, e)79 e.args = tuple(args)80 e.message = args[0]81 raise82############################################################83## Object types84############################################################85class _ObjectType(object):86 name = None87 egg_protocols = None88 config_prefixes = None89 def __init__(self):90 # Normalize these variables:91 self.egg_protocols = [_aslist(p) for p in _aslist(self.egg_protocols)]92 self.config_prefixes = [_aslist(p) for p in _aslist(self.config_prefixes)]93 def __repr__(self):94 return '<%s protocols=%r prefixes=%r>' % (95 self.name, self.egg_protocols, self.config_prefixes)96 def invoke(self, context):97 assert context.protocol in _flatten(self.egg_protocols)98 return fix_call(context.object,99 context.global_conf, **context.local_conf)100class _App(_ObjectType):101 name = 'application'102 egg_protocols = ['paste.app_factory', 'paste.composite_factory',103 'paste.composit_factory']104 config_prefixes = [['app', 'application'], ['composite', 'composit'],105 'pipeline', 'filter-app']106 def invoke(self, context):107 if context.protocol in ('paste.composit_factory',108 'paste.composite_factory'):109 return fix_call(context.object,110 context.loader, context.global_conf,111 **context.local_conf)112 elif context.protocol == 'paste.app_factory':113 return fix_call(context.object, context.global_conf, **context.local_conf)114 else:115 assert 0, "Protocol %r unknown" % context.protocol116APP = _App()117class _Filter(_ObjectType):118 name = 'filter'119 egg_protocols = [['paste.filter_factory', 'paste.filter_app_factory']]120 config_prefixes = ['filter']121 def invoke(self, context):122 if context.protocol == 'paste.filter_factory':123 return fix_call(context.object,124 context.global_conf, **context.local_conf)125 elif context.protocol == 'paste.filter_app_factory':126 def filter_wrapper(wsgi_app):127 # This should be an object, so it has a nicer __repr__128 return fix_call(context.object,129 wsgi_app, context.global_conf,130 **context.local_conf)131 return filter_wrapper132 else:133 assert 0, "Protocol %r unknown" % context.protocol134FILTER = _Filter()135class _Server(_ObjectType):136 name = 'server'137 egg_protocols = [['paste.server_factory', 'paste.server_runner']]138 config_prefixes = ['server']139 def invoke(self, context):140 if context.protocol == 'paste.server_factory':141 return fix_call(context.object,142 context.global_conf, **context.local_conf)143 elif context.protocol == 'paste.server_runner':144 def server_wrapper(wsgi_app):145 # This should be an object, so it has a nicer __repr__146 return fix_call(context.object,147 wsgi_app, context.global_conf,148 **context.local_conf)149 return server_wrapper150 else:151 assert 0, "Protocol %r unknown" % context.protocol152SERVER = _Server()153# Virtual type: (@@: There's clearly something crufty here;154# this probably could be more elegant)155class _PipeLine(_ObjectType):156 name = 'pipeline'157 def invoke(self, context):158 app = context.app_context.create()159 filters = [c.create() for c in context.filter_contexts]160 filters.reverse()161 for filter in filters:162 app = filter(app)163 return app164PIPELINE = _PipeLine()165class _FilterApp(_ObjectType):166 name = 'filter_app'167 def invoke(self, context):168 next_app = context.next_context.create()169 filter = context.filter_context.create()170 return filter(next_app)171FILTER_APP = _FilterApp()172class _FilterWith(_App):173 name = 'filtered_with'174 def invoke(self, context):175 filter = context.filter_context.create()176 filtered = context.next_context.create()177 if context.next_context.object_type is APP:178 return filter(filtered)179 else:180 # filtering a filter181 def composed(app):182 return filter(filtered(app))183 return composed184FILTER_WITH = _FilterWith()185############################################################186## Loaders187############################################################188def loadapp(uri, name=None, **kw):189 return loadobj(APP, uri, name=name, **kw)190def loadfilter(uri, name=None, **kw):191 return loadobj(FILTER, uri, name=name, **kw)192def loadserver(uri, name=None, **kw):193 return loadobj(SERVER, uri, name=name, **kw)194def appconfig(uri, name=None, relative_to=None, global_conf=None):195 context = loadcontext(APP, uri, name=name,196 relative_to=relative_to,197 global_conf=global_conf)198 return context.config()199_loaders = {}200def loadobj(object_type, uri, name=None, relative_to=None,201 global_conf=None):202 context = loadcontext(203 object_type, uri, name=name, relative_to=relative_to,204 global_conf=global_conf)205 return context.create()206def loadcontext(object_type, uri, name=None, relative_to=None,207 global_conf=None):208 if '#' in uri:209 if name is None:210 uri, name = uri.split('#', 1)211 else:212 # @@: Ignore fragment or error?213 uri = uri.split('#', 1)[0]214 if name is None:215 name = 'main'216 if ':' not in uri:217 raise LookupError("URI has no scheme: %r" % uri)218 scheme, path = uri.split(':', 1)219 scheme = scheme.lower()220 if scheme not in _loaders:221 raise LookupError(222 "URI scheme not known: %r (from %s)"223 % (scheme, ', '.join(_loaders.keys())))224 return _loaders[scheme](225 object_type,226 uri, path, name=name, relative_to=relative_to,227 global_conf=global_conf)228def _loadconfig(object_type, uri, path, name, relative_to,229 global_conf):230 isabs = os.path.isabs(path)231 # De-Windowsify the paths:232 path = path.replace('\\', '/')233 if not isabs:234 if not relative_to:235 raise ValueError(236 "Cannot resolve relative uri %r; no relative_to keyword "237 "argument given" % uri)238 relative_to = relative_to.replace('\\', '/')239 if relative_to.endswith('/'):240 path = relative_to + path241 else:242 path = relative_to + '/' + path243 if path.startswith('///'):244 path = path[2:]245 path = unquote(path)246 loader = ConfigLoader(path)247 if global_conf:248 loader.update_defaults(global_conf, overwrite=False)249 return loader.get_context(object_type, name, global_conf)250_loaders['config'] = _loadconfig251def _loadegg(object_type, uri, spec, name, relative_to,252 global_conf):253 loader = EggLoader(spec)254 return loader.get_context(object_type, name, global_conf)255_loaders['egg'] = _loadegg256def _loadfunc(object_type, uri, spec, name, relative_to,257 global_conf):258 loader = FuncLoader(spec)259 return loader.get_context(object_type, name, global_conf)260_loaders['call'] = _loadfunc261############################################################262## Loaders263############################################################264class _Loader(object):265 def get_app(self, name=None, global_conf=None):266 return self.app_context(267 name=name, global_conf=global_conf).create()268 def get_filter(self, name=None, global_conf=None):269 return self.filter_context(270 name=name, global_conf=global_conf).create()271 def get_server(self, name=None, global_conf=None):272 return self.server_context(273 name=name, global_conf=global_conf).create()274 def app_context(self, name=None, global_conf=None):275 return self.get_context(276 APP, name=name, global_conf=global_conf)277 def filter_context(self, name=None, global_conf=None):278 return self.get_context(279 FILTER, name=name, global_conf=global_conf)280 def server_context(self, name=None, global_conf=None):281 return self.get_context(282 SERVER, name=name, global_conf=global_conf)283 _absolute_re = re.compile(r'^[a-zA-Z]+:')284 def absolute_name(self, name):285 """286 Returns true if the name includes a scheme287 """288 if name is None:289 return False290 return self._absolute_re.search(name)291class ConfigLoader(_Loader):292 def __init__(self, filename):293 self.filename = filename = filename.strip()294 defaults = {295 'here': os.path.dirname(os.path.abspath(filename)),296 '__file__': os.path.abspath(filename)297 }298 self.parser = NicerConfigParser(filename, defaults=defaults)299 self.parser.optionxform = str # Don't lower-case keys300 with open(filename) as f:301 self.parser.read_file(f)302 def update_defaults(self, new_defaults, overwrite=True):303 for key, value in iteritems(new_defaults):304 if not overwrite and key in self.parser._defaults:305 continue306 self.parser._defaults[key] = value307 def get_context(self, object_type, name=None, global_conf=None):308 if self.absolute_name(name):309 return loadcontext(object_type, name,310 relative_to=os.path.dirname(self.filename),311 global_conf=global_conf)312 section = self.find_config_section(313 object_type, name=name)314 if global_conf is None:315 global_conf = {}316 else:317 global_conf = global_conf.copy()318 defaults = self.parser.defaults()319 global_conf.update(defaults)320 local_conf = {}321 global_additions = {}322 get_from_globals = {}323 for option in self.parser.options(section):324 if option.startswith('set '):325 name = option[4:].strip()326 global_additions[name] = global_conf[name] = (327 self.parser.get(section, option))328 elif option.startswith('get '):329 name = option[4:].strip()330 get_from_globals[name] = self.parser.get(section, option)331 else:332 if option in defaults:333 # @@: It's a global option (?), so skip it334 continue335 local_conf[option] = self.parser.get(section, option)336 for local_var, glob_var in get_from_globals.items():337 local_conf[local_var] = global_conf[glob_var]338 if object_type in (APP, FILTER) and 'filter-with' in local_conf:339 filter_with = local_conf.pop('filter-with')340 else:341 filter_with = None342 if 'require' in local_conf:343 for spec in local_conf['require'].split():344 pkg_resources.require(spec)345 del local_conf['require']346 if section.startswith('filter-app:'):347 context = self._filter_app_context(348 object_type, section, name=name,349 global_conf=global_conf, local_conf=local_conf,350 global_additions=global_additions)351 elif section.startswith('pipeline:'):352 context = self._pipeline_app_context(353 object_type, section, name=name,354 global_conf=global_conf, local_conf=local_conf,355 global_additions=global_additions)356 elif 'use' in local_conf:357 context = self._context_from_use(358 object_type, local_conf, global_conf, global_additions,359 section)360 else:361 context = self._context_from_explicit(362 object_type, local_conf, global_conf, global_additions,363 section)364 if filter_with is not None:365 filter_with_context = LoaderContext(366 obj=None,367 object_type=FILTER_WITH,368 protocol=None,369 global_conf=global_conf, local_conf=local_conf,370 loader=self)371 filter_with_context.filter_context = self.filter_context(372 name=filter_with, global_conf=global_conf)373 filter_with_context.next_context = context374 return filter_with_context375 return context376 def _context_from_use(self, object_type, local_conf, global_conf,377 global_additions, section):378 use = local_conf.pop('use')379 context = self.get_context(380 object_type, name=use, global_conf=global_conf)381 context.global_conf.update(global_additions)382 context.local_conf.update(local_conf)383 if '__file__' in global_conf:384 # use sections shouldn't overwrite the original __file__385 context.global_conf['__file__'] = global_conf['__file__']386 # @@: Should loader be overwritten?387 context.loader = self388 if context.protocol is None:389 # Determine protocol from section type390 section_protocol = section.split(':', 1)[0]391 if section_protocol in ('application', 'app'):392 context.protocol = 'paste.app_factory'393 elif section_protocol in ('composit', 'composite'):394 context.protocol = 'paste.composit_factory'395 else:396 # This will work with 'server' and 'filter', otherwise it397 # could fail but there is an error message already for398 # bad protocols399 context.protocol = 'paste.%s_factory' % section_protocol400 return context401 def _context_from_explicit(self, object_type, local_conf, global_conf,402 global_addition, section):403 possible = []404 for protocol_options in object_type.egg_protocols:405 for protocol in protocol_options:406 if protocol in local_conf:407 possible.append((protocol, local_conf[protocol]))408 break409 if len(possible) > 1:410 raise LookupError(411 "Multiple protocols given in section %r: %s"412 % (section, possible))413 if not possible:414 raise LookupError(415 "No loader given in section %r" % section)416 found_protocol, found_expr = possible[0]417 del local_conf[found_protocol]418 value = import_string(found_expr)419 context = LoaderContext(420 value, object_type, found_protocol,421 global_conf, local_conf, self)422 return context423 def _filter_app_context(self, object_type, section, name,424 global_conf, local_conf, global_additions):425 if 'next' not in local_conf:426 raise LookupError(427 "The [%s] section in %s is missing a 'next' setting"428 % (section, self.filename))429 next_name = local_conf.pop('next')430 context = LoaderContext(None, FILTER_APP, None, global_conf,431 local_conf, self)432 context.next_context = self.get_context(433 APP, next_name, global_conf)434 if 'use' in local_conf:435 context.filter_context = self._context_from_use(436 FILTER, local_conf, global_conf, global_additions,437 section)438 else:439 context.filter_context = self._context_from_explicit(440 FILTER, local_conf, global_conf, global_additions,441 section)442 return context443 def _pipeline_app_context(self, object_type, section, name,444 global_conf, local_conf, global_additions):445 if 'pipeline' not in local_conf:446 raise LookupError(447 "The [%s] section in %s is missing a 'pipeline' setting"448 % (section, self.filename))449 pipeline = local_conf.pop('pipeline').split()450 if local_conf:451 raise LookupError(452 "The [%s] pipeline section in %s has extra "453 "(disallowed) settings: %s"454 % (', '.join(local_conf.keys())))455 context = LoaderContext(None, PIPELINE, None, global_conf,456 local_conf, self)457 context.app_context = self.get_context(458 APP, pipeline[-1], global_conf)459 context.filter_contexts = [460 self.get_context(FILTER, name, global_conf)461 for name in pipeline[:-1]]462 return context463 def find_config_section(self, object_type, name=None):464 """465 Return the section name with the given name prefix (following the466 same pattern as ``protocol_desc`` in ``config``. It must have the467 given name, or for ``'main'`` an empty name is allowed. The468 prefix must be followed by a ``:``.469 Case is *not* ignored.470 """471 possible = []472 for name_options in object_type.config_prefixes:473 for name_prefix in name_options:474 found = self._find_sections(475 self.parser.sections(), name_prefix, name)476 if found:477 possible.extend(found)478 break479 if not possible:480 raise LookupError(481 "No section %r (prefixed by %s) found in config %s"482 % (name,483 ' or '.join(map(repr, _flatten(object_type.config_prefixes))),484 self.filename))485 if len(possible) > 1:486 raise LookupError(487 "Ambiguous section names %r for section %r (prefixed by %s) "488 "found in config %s"489 % (possible, name,490 ' or '.join(map(repr, _flatten(object_type.config_prefixes))),491 self.filename))492 return possible[0]493 def _find_sections(self, sections, name_prefix, name):494 found = []495 if name is None:496 if name_prefix in sections:497 found.append(name_prefix)498 name = 'main'499 for section in sections:500 if section.startswith(name_prefix + ':'):501 if section[len(name_prefix) + 1:].strip() == name:502 found.append(section)503 return found504class EggLoader(_Loader):505 def __init__(self, spec):506 self.spec = spec507 def get_context(self, object_type, name=None, global_conf=None):508 if self.absolute_name(name):509 return loadcontext(object_type, name,510 global_conf=global_conf)511 entry_point, protocol, ep_name = self.find_egg_entry_point(512 object_type, name=name)513 return LoaderContext(514 entry_point,515 object_type,516 protocol,517 global_conf or {}, {},518 self,519 distribution=pkg_resources.get_distribution(self.spec),520 entry_point_name=ep_name)521 def find_egg_entry_point(self, object_type, name=None):522 """523 Returns the (entry_point, protocol) for the with the given524 ``name``.525 """526 if name is None:527 name = 'main'528 possible = []529 for protocol_options in object_type.egg_protocols:530 for protocol in protocol_options:531 pkg_resources.require(self.spec)532 entry = pkg_resources.get_entry_info(533 self.spec,534 protocol,535 name)536 if entry is not None:537 possible.append((entry.load(), protocol, entry.name))538 break539 if not possible:540 # Better exception541 dist = pkg_resources.get_distribution(self.spec)542 raise LookupError(543 "Entry point %r not found in egg %r (dir: %s; protocols: %s; "544 "entry_points: %s)"545 % (name, self.spec,546 dist.location,547 ', '.join(_flatten(object_type.egg_protocols)),548 ', '.join(_flatten([549 dictkeys(pkg_resources.get_entry_info(self.spec, prot, name) or {})550 for prot in protocol_options] or '(no entry points)'))))551 if len(possible) > 1:552 raise LookupError(553 "Ambiguous entry points for %r in egg %r (protocols: %s)"554 % (name, self.spec, ', '.join(_flatten(protocol_options))))555 return possible[0]556class FuncLoader(_Loader):557 """ Loader that supports specifying functions inside modules, without558 using eggs at all. Configuration should be in the format:559 use = call:my.module.path:function_name560 561 Dot notation is supported in both the module and function name, e.g.:562 use = call:my.module.path:object.method563 """564 def __init__(self, spec):565 self.spec = spec566 if not ':' in spec:567 raise LookupError("Configuration not in format module:function")568 def get_context(self, object_type, name=None, global_conf=None):569 obj = lookup_object(self.spec)570 return LoaderContext(571 obj,572 object_type,573 None, # determine protocol from section type574 global_conf or {},575 {},576 self,577 )578class LoaderContext(object):579 def __init__(self, obj, object_type, protocol,580 global_conf, local_conf, loader,581 distribution=None, entry_point_name=None):582 self.object = obj583 self.object_type = object_type584 self.protocol = protocol585 #assert protocol in _flatten(object_type.egg_protocols), (586 # "Bad protocol %r; should be one of %s"587 # % (protocol, ', '.join(map(repr, _flatten(object_type.egg_protocols)))))588 self.global_conf = global_conf589 self.local_conf = local_conf590 self.loader = loader591 self.distribution = distribution592 self.entry_point_name = entry_point_name593 def create(self):594 return self.object_type.invoke(self)595 def config(self):596 conf = AttrDict(self.global_conf)597 conf.update(self.local_conf)598 conf.local_conf = self.local_conf599 conf.global_conf = self.global_conf600 conf.context = self601 return conf602class AttrDict(dict):603 """604 A dictionary that can be assigned to.605 """...
SwaggerToSdkNewCLI.py
Source:SwaggerToSdkNewCLI.py
1"""Swagger to SDK"""2import os3import shutil4import logging5import json6from pathlib import Path7import tempfile8from git import Repo, GitCommandError9from .SwaggerToSdkCore import (10 read_config_from_github,11 DEFAULT_COMMIT_MESSAGE,12 get_input_paths,13 extract_conf_from_readmes,14 get_readme_files_from_git_object,15 build_file_content,16 solve_relative_path,17 this_conf_will_generate_for_this_pr18)19from .autorest_tools import (20 execute_simple_command,21 generate_code,22 merge_options,23)24from azure_devtools.ci_tools.git_tools import (25 checkout_and_create_branch,26 do_commit,27)28from azure_devtools.ci_tools.github_tools import (29 configure_user,30 manage_git_folder,31)32_LOGGER = logging.getLogger(__name__)33def move_wrapper_files_or_dirs(src_root, dst_root, global_conf, local_conf):34 """Save wrapper files somewhere for replace them after generation.35 """36 src_relative_path = local_conf.get('output_dir', '')37 src_abs_path = Path(src_root, src_relative_path)38 dst_abs_path = Path(dst_root, src_relative_path)39 wrapper_files_or_dirs = merge_options(global_conf, local_conf, "wrapper_filesOrDirs") or []40 for wrapper_file_or_dir in wrapper_files_or_dirs:41 for file_path in src_abs_path.glob(wrapper_file_or_dir):42 relative_file_path = file_path.relative_to(src_abs_path)43 file_path_dest = Path(dst_abs_path, relative_file_path)44 if file_path.is_file():45 file_path_dest.parent.mkdir(parents=True, exist_ok=True)46 _LOGGER.info("Moving %s to %s", str(file_path), str(file_path_dest))47 # This does not work in Windows if generatd and dest are not in the same drive48 # file_path.replace(file_path_dest)49 shutil.move(file_path, file_path_dest)50def delete_extra_files(sdk_root, global_conf, local_conf):51 src_relative_path = local_conf.get('output_dir', '')52 src_abs_path = Path(sdk_root, src_relative_path)53 delete_files_or_dirs = merge_options(global_conf, local_conf, "delete_filesOrDirs") or []54 for delete_file_or_dir in delete_files_or_dirs:55 for file_path in src_abs_path.glob(delete_file_or_dir):56 if file_path.is_file():57 file_path.unlink()58 else:59 shutil.rmtree(str(file_path))60def move_autorest_files(client_generated_path, sdk_root, global_conf, local_conf):61 """Update data from generated to final folder.62 This is one only if output_dir is set, otherwise it's considered generated in place63 and does not required moving64 """65 dest = local_conf.get('output_dir', None)66 if not dest:67 return68 destination_folder = get_local_path_dir(sdk_root, dest)69 generated_relative_base_directory = local_conf.get('generated_relative_base_directory') or \70 global_conf.get('generated_relative_base_directory')71 if generated_relative_base_directory:72 client_possible_path = [elt for elt in client_generated_path.glob(generated_relative_base_directory) if elt.is_dir()]73 try:74 client_generated_path = client_possible_path.pop()75 except IndexError:76 err_msg = "Incorrect generated_relative_base_directory folder: {}\n".format(generated_relative_base_directory)77 err_msg += "Base folders were: : {}\n".format([f.relative_to(client_generated_path) for f in client_generated_path.iterdir()])78 _LOGGER.critical(err_msg)79 raise ValueError(err_msg)80 if client_possible_path:81 err_msg = "generated_relative_base_directory parameter is ambiguous: {} {}".format(82 client_generated_path,83 client_possible_path84 )85 _LOGGER.critical(err_msg)86 raise ValueError(err_msg)87 shutil.rmtree(str(destination_folder))88 # This does not work in Windows if generatd and dest are not in the same drive89 # client_generated_path.replace(destination_folder)90 shutil.move(client_generated_path, destination_folder)91def write_build_file(sdk_root, local_conf):92 build_dir = local_conf.get('build_dir')93 if build_dir:94 build_folder = get_local_path_dir(sdk_root, build_dir)95 build_file = Path(build_folder, "build.json")96 with open(build_file, 'w') as build_fd:97 json.dump(build_file_content(), build_fd, indent=2)98def execute_after_script(sdk_root, global_conf, local_conf):99 after_scripts = merge_options(global_conf, local_conf, "after_scripts", keep_list_order=True) or []100 local_envs = dict(os.environ)101 local_envs.update(global_conf.get("envs", {}))102 for script in after_scripts:103 _LOGGER.info("Execute after script: %s", script)104 execute_simple_command(script, cwd=sdk_root, shell=True, env=local_envs)105def get_local_path_dir(root, relative_path):106 build_folder = Path(root, relative_path)107 if not build_folder.is_dir():108 err_msg = "Folder does not exist or is not accessible: {}".format(109 build_folder)110 _LOGGER.critical(err_msg)111 raise ValueError(err_msg)112 return build_folder113def build_project(temp_dir, project, absolute_markdown_path, sdk_folder, global_conf, local_conf, autorest_bin=None):114 absolute_generated_path = Path(temp_dir, project)115 absolute_save_path = Path(temp_dir, "save")116 move_wrapper_files_or_dirs(sdk_folder, absolute_save_path, global_conf, local_conf)117 generate_code(absolute_markdown_path,118 global_conf,119 local_conf,120 absolute_generated_path if "output_dir" in local_conf else None,121 autorest_bin)122 move_autorest_files(absolute_generated_path, sdk_folder, global_conf, local_conf)123 move_wrapper_files_or_dirs(absolute_save_path, sdk_folder, global_conf, local_conf)124 delete_extra_files(sdk_folder, global_conf, local_conf)125 write_build_file(sdk_folder, local_conf)126 execute_after_script(sdk_folder, global_conf, local_conf)127def build_libraries(config, skip_callback, restapi_git_folder, sdk_repo, temp_dir, autorest_bin=None):128 """Main method of the the file"""129 global_conf = config["meta"]130 global_conf["autorest_options"] = solve_relative_path(global_conf.get("autorest_options", {}), sdk_repo.working_tree_dir)131 global_conf["envs"] = solve_relative_path(global_conf.get("envs", {}), sdk_repo.working_tree_dir)132 global_conf["advanced_options"] = solve_relative_path(global_conf.get("advanced_options", {}), sdk_repo.working_tree_dir)133 for project, local_conf in config.get("projects", {}).items():134 if skip_callback(project, local_conf):135 _LOGGER.info("Skip project %s", project)136 continue137 local_conf["autorest_options"] = solve_relative_path(local_conf.get("autorest_options", {}), sdk_repo.working_tree_dir)138 markdown_relative_path, optional_relative_paths = get_input_paths(global_conf, local_conf)139 _LOGGER.info(f"Markdown input: {markdown_relative_path}")140 _LOGGER.info(f"Optional inputs: {optional_relative_paths}")141 absolute_markdown_path = None142 if markdown_relative_path:143 absolute_markdown_path = Path(restapi_git_folder, markdown_relative_path).resolve()144 if optional_relative_paths:145 local_conf.setdefault('autorest_options', {})['input-file'] = [146 Path(restapi_git_folder, input_path).resolve()147 for input_path148 in optional_relative_paths149 ]150 sdk_folder = sdk_repo.working_tree_dir151 build_project(152 temp_dir,153 project,154 absolute_markdown_path,155 sdk_folder,156 global_conf,157 local_conf,158 autorest_bin159 )160def generate_sdk_from_git_object(git_object, branch_name, restapi_git_id, sdk_git_id, base_branch_names, *, fallback_base_branch_name="master", sdk_tag=None):161 """Generate SDK from a commit or a PR object.162 git_object is the initial commit/PR from the RestAPI repo. If git_object is a PR, prefer to checkout Github PR "merge_commit_sha"163 restapi_git_id explains where to clone the repo.164 sdk_git_id explains where to push the commit.165 sdk_tag explains what is the tag used in the Readme for the swagger-to-sdk section. If not provided, use sdk_git_id.166 branch_name is the expected branch name in the SDK repo.167 - If this branch exists, use it.168 - If not, use the base branch to create that branch (base branch is where I intend to do my PR)169 - If base_branch_names is not provided, use fallback_base_branch_name as base170 - If this base branch is provided and does not exists, create this base branch first using fallback_base_branch_name (this one is required to exist)171 WARNING:172 This method might push to "branch_name" and "base_branch_name". No push will be made to "fallback_base_branch_name"173 """174 gh_token = os.environ["GH_TOKEN"]175 message_template = DEFAULT_COMMIT_MESSAGE176 autorest_bin = None177 if sdk_tag is None:178 sdk_tag = sdk_git_id179 try: # Checkout the sha if commit obj180 branched_rest_api_id = restapi_git_id+'@'+git_object.sha181 pr_number = None182 except (AttributeError, TypeError): # This is a PR, don't clone the fork but "base" repo and PR magic commit183 if git_object.merge_commit_sha:184 branched_rest_api_id = git_object.base.repo.full_name+'@'+git_object.merge_commit_sha185 else:186 branched_rest_api_id = git_object.base.repo.full_name187 pr_number = git_object.number188 # Always clone SDK from fallback branch that is required to exist189 branched_sdk_git_id = sdk_git_id+'@'+fallback_base_branch_name190 # I don't know if the destination branch exists, try until it works191 config = None192 branch_list = base_branch_names + [branch_name] + [fallback_base_branch_name]193 for branch in branch_list:194 try:195 config = read_config_from_github(sdk_git_id, branch, gh_token)196 except Exception:197 pass198 else:199 break200 if config is None:201 raise ValueError("Unable to locate configuration in {}".format(branch_list))202 global_conf = config["meta"]203 # If PR is only about a language that this conf can't handle, skip fast204 if not this_conf_will_generate_for_this_pr(git_object, global_conf):205 _LOGGER.info("Skipping this job based on conf not impacted by Git object")206 return207 with tempfile.TemporaryDirectory() as temp_dir:208 clone_dir = Path(temp_dir) / Path(global_conf.get("advanced_options", {}).get("clone_dir", "sdk"))209 _LOGGER.info("Clone dir will be: %s", clone_dir)210 with manage_git_folder(gh_token, Path(temp_dir) / Path("rest"), branched_rest_api_id, pr_number=pr_number) as restapi_git_folder, \211 manage_git_folder(gh_token, clone_dir, branched_sdk_git_id) as sdk_folder:212 readme_files_infered = get_readme_files_from_git_object(git_object, restapi_git_folder)213 _LOGGER.info("Readmes files infered from PR: %s ", readme_files_infered)214 if not readme_files_infered:215 _LOGGER.info("No Readme in PR, quit")216 return217 # SDK part218 sdk_repo = Repo(str(sdk_folder))219 for base_branch in base_branch_names:220 _LOGGER.info('Checkout and create %s', base_branch)221 checkout_and_create_branch(sdk_repo, base_branch)222 _LOGGER.info('Try to checkout destination branch %s', branch_name)223 try:224 sdk_repo.git.checkout(branch_name)225 _LOGGER.info('The branch exists.')226 except GitCommandError:227 _LOGGER.info('Destination branch does not exists')228 # Will be created by do_commit229 configure_user(gh_token, sdk_repo)230 # Look for configuration in Readme231 _LOGGER.info('Extract conf from Readmes for target: %s', sdk_git_id)232 extract_conf_from_readmes(readme_files_infered, restapi_git_folder, sdk_tag, config)233 _LOGGER.info('End of extraction')234 def skip_callback(project, local_conf):235 # We know "project" is based on Path in "readme_files_infered"236 if Path(project) in readme_files_infered:237 return False238 # Might be a regular project239 markdown_relative_path, optional_relative_paths = get_input_paths(global_conf, local_conf)240 if not (241 markdown_relative_path in readme_files_infered or242 any(input_file in readme_files_infered for input_file in optional_relative_paths)):243 _LOGGER.info(f"In project {project} no files involved in this commit")244 return True245 return False246 build_libraries(config, skip_callback, restapi_git_folder,247 sdk_repo, temp_dir, autorest_bin)248 try:249 commit_for_sha = git_object.commit # Commit250 except AttributeError:251 commit_for_sha = list(git_object.get_commits())[-1].commit # PR252 message = message_template + "\n\n" + commit_for_sha.message253 commit_sha = do_commit(sdk_repo, message, branch_name, commit_for_sha.sha)254 if commit_sha:255 for base_branch in base_branch_names:256 sdk_repo.git.push('origin', base_branch, set_upstream=True)257 sdk_repo.git.push('origin', branch_name, set_upstream=True)...
test.py
Source:test.py
1# Copyright (C) 2017 Red Hat, Inc.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or12# implied.13#14# See the License for the specific language governing permissions and15# limitations under the License.16import os17import shutil18import tempfile19import unittest20from devstack_local_conf import LocalConf21from collections import OrderedDict22class TestDevstackLocalConf(unittest.TestCase):23 def setUp(self):24 self.tmpdir = tempfile.mkdtemp()25 def tearDown(self):26 shutil.rmtree(self.tmpdir)27 def test_plugins(self):28 "Test that plugins without dependencies work"29 localrc = {'test_localrc': '1'}30 local_conf = {'install':31 {'nova.conf':32 {'main':33 {'test_conf': '2'}}}}34 services = {'cinder': True}35 # We use ordereddict here to make sure the plugins are in the36 # *wrong* order for testing.37 plugins = OrderedDict([38 ('bar', 'git://git.openstack.org/openstack/bar-plugin'),39 ('foo', 'git://git.openstack.org/openstack/foo-plugin'),40 ('baz', 'git://git.openstack.org/openstack/baz-plugin'),41 ])42 p = dict(localrc=localrc,43 local_conf=local_conf,44 base_services=[],45 services=services,46 plugins=plugins,47 base_dir='./test',48 path=os.path.join(self.tmpdir, 'test.local.conf'))49 lc = LocalConf(p.get('localrc'),50 p.get('local_conf'),51 p.get('base_services'),52 p.get('services'),53 p.get('plugins'),54 p.get('base_dir'))55 lc.write(p['path'])56 plugins = []57 with open(p['path']) as f:58 for line in f:59 if line.startswith('enable_plugin'):60 plugins.append(line.split()[1])61 self.assertEqual(['bar', 'baz', 'foo'], plugins)62 def test_plugin_deps(self):63 "Test that plugins with dependencies work"64 os.makedirs(os.path.join(self.tmpdir, 'foo-plugin', 'devstack'))65 os.makedirs(os.path.join(self.tmpdir, 'foo-plugin', '.git'))66 os.makedirs(os.path.join(self.tmpdir, 'bar-plugin', 'devstack'))67 os.makedirs(os.path.join(self.tmpdir, 'bar-plugin', '.git'))68 with open(os.path.join(69 self.tmpdir,70 'foo-plugin', 'devstack', 'settings'), 'w') as f:71 f.write('define_plugin foo\n')72 with open(os.path.join(73 self.tmpdir,74 'bar-plugin', 'devstack', 'settings'), 'w') as f:75 f.write('define_plugin bar\n')76 f.write('plugin_requires bar foo\n')77 localrc = {'test_localrc': '1'}78 local_conf = {'install':79 {'nova.conf':80 {'main':81 {'test_conf': '2'}}}}82 services = {'cinder': True}83 # We use ordereddict here to make sure the plugins are in the84 # *wrong* order for testing.85 plugins = OrderedDict([86 ('bar', 'git://git.openstack.org/openstack/bar-plugin'),87 ('foo', 'git://git.openstack.org/openstack/foo-plugin'),88 ])89 p = dict(localrc=localrc,90 local_conf=local_conf,91 base_services=[],92 services=services,93 plugins=plugins,94 base_dir=self.tmpdir,95 path=os.path.join(self.tmpdir, 'test.local.conf'))96 lc = LocalConf(p.get('localrc'),97 p.get('local_conf'),98 p.get('base_services'),99 p.get('services'),100 p.get('plugins'),101 p.get('base_dir'))102 lc.write(p['path'])103 plugins = []104 with open(p['path']) as f:105 for line in f:106 if line.startswith('enable_plugin'):107 plugins.append(line.split()[1])108 self.assertEqual(['foo', 'bar'], plugins)109 def test_plugin_circular_deps(self):110 "Test that plugins with circular dependencies fail"111 os.makedirs(os.path.join(self.tmpdir, 'foo-plugin', 'devstack'))112 os.makedirs(os.path.join(self.tmpdir, 'foo-plugin', '.git'))113 os.makedirs(os.path.join(self.tmpdir, 'bar-plugin', 'devstack'))114 os.makedirs(os.path.join(self.tmpdir, 'bar-plugin', '.git'))115 with open(os.path.join(116 self.tmpdir,117 'foo-plugin', 'devstack', 'settings'), 'w') as f:118 f.write('define_plugin foo\n')119 f.write('plugin_requires foo bar\n')120 with open(os.path.join(121 self.tmpdir,122 'bar-plugin', 'devstack', 'settings'), 'w') as f:123 f.write('define_plugin bar\n')124 f.write('plugin_requires bar foo\n')125 localrc = {'test_localrc': '1'}126 local_conf = {'install':127 {'nova.conf':128 {'main':129 {'test_conf': '2'}}}}130 services = {'cinder': True}131 # We use ordereddict here to make sure the plugins are in the132 # *wrong* order for testing.133 plugins = OrderedDict([134 ('bar', 'git://git.openstack.org/openstack/bar-plugin'),135 ('foo', 'git://git.openstack.org/openstack/foo-plugin'),136 ])137 p = dict(localrc=localrc,138 local_conf=local_conf,139 base_services=[],140 services=services,141 plugins=plugins,142 base_dir=self.tmpdir,143 path=os.path.join(self.tmpdir, 'test.local.conf'))144 with self.assertRaises(Exception):145 lc = LocalConf(p.get('localrc'),146 p.get('local_conf'),147 p.get('base_services'),148 p.get('services'),149 p.get('plugins'),150 p.get('base_dir'))151 lc.write(p['path'])152if __name__ == '__main__':...
test_storage.py
Source:test_storage.py
1import os2from paste.deploy import appconfig3import paste.fixture4from ckan.config.middleware import make_app5import ckan.model as model6from ckan.tests import conf_dir, url_for, CreateTestData7from ckan.controllers.admin import get_sysadmins8from ckan.controllers.storage import create_pairtree_marker9class TestStorageAPIController:10 @classmethod11 def setup_class(cls):12 config = appconfig('config:test.ini', relative_to=conf_dir)13 for key in config.local_conf.keys():14 if key.startswith('ofs'):15 del config.local_conf[key]16 config.local_conf['ofs.impl'] = 'pairtree'17 config.local_conf['ckan.storage.bucket'] = 'ckantest'18 config.local_conf['ofs.storage_dir'] = '/tmp/ckan-test-ckanext-storage'19 create_pairtree_marker( config.local_conf['ofs.storage_dir'] )20 wsgiapp = make_app(config.global_conf, **config.local_conf)21 cls.app = paste.fixture.TestApp(wsgiapp)22 CreateTestData.create_test_user()23 @classmethod24 def teardown_class(cls):25 CreateTestData.delete()26 def test_index(self):27 url = url_for('storage_api')28 res = self.app.get(url)29 out = res.json30 assert len(res.json) == 331 def test_authz(self):32 url = url_for('storage_api_auth_form', label='abc')33 # Non logged in users can not upload34 res = self.app.get(url, status=[302,401])35 36 # Logged in users can upload37 res = self.app.get(url, status=[200], extra_environ={'REMOTE_USER':'tester'})38 39 40 # TODO: ? test for non-authz case41 # url = url_for('storage_api_auth_form', label='abc')42 # res = self.app.get(url, status=[302,401])43class TestStorageAPIControllerLocal:44 @classmethod45 def setup_class(cls):46 config = appconfig('config:test.ini', relative_to=conf_dir)47 for key in config.local_conf.keys():48 if key.startswith('ofs'):49 del config.local_conf[key]50 config.local_conf['ckan.storage.bucket'] = 'ckantest'51 config.local_conf['ofs.impl'] = 'pairtree'52 config.local_conf['ofs.storage_dir'] = '/tmp/ckan-test-ckanext-storage'53 create_pairtree_marker( config.local_conf['ofs.storage_dir'] )54 wsgiapp = make_app(config.global_conf, **config.local_conf)55 cls.app = paste.fixture.TestApp(wsgiapp)56 CreateTestData.create()57 model.Session.remove()58 user = model.User.by_name('tester')59 cls.extra_environ = {'Authorization': str(user.apikey)}60 @classmethod61 def teardown_class(cls):62 CreateTestData.delete()63 def test_auth_form(self):64 url = url_for('storage_api_auth_form', label='abc')65 res = self.app.get(url, extra_environ=self.extra_environ, status=200)66 assert res.json['action'] == u'/storage/upload_handle', res.json67 assert res.json['fields'][-1]['value'] == 'abc', res68 url = url_for('storage_api_auth_form', label='abc/xxx')69 res = self.app.get(url, extra_environ=self.extra_environ, status=200)70 assert res.json['fields'][-1]['value'] == 'abc/xxx'71 def test_metadata(self):72 url = url_for('storage_api_get_metadata', label='abc')73 res = self.app.get(url, status=404)74 # TODO: test get metadata on real setup ...75 label = 'abc'76 url = url_for('storage_api_set_metadata',77 extra_environ=self.extra_environ,78 label=label,79 data=dict(80 label=label81 )82 )83 # res = self.app.get(url, status=404)84# Disabling because requires access to google storage to run (and this is not85# generally available to devs ...)86class _TestStorageAPIControllerGoogle:87 @classmethod88 def setup_class(cls):89 config = appconfig('config:test.ini', relative_to=conf_dir)90 config.local_conf['ckan.storage.bucket'] = 'ckantest'91 config.local_conf['ofs.impl'] = 'google'92 if 'ofs.gs_secret_access_key' not in config.local_conf:93 raise Exception('You will need to configure access to google storage to run this test')94 # You will need these configured in your95 # config.local_conf['ofs.gs_access_key_id'] = 'GOOGCABCDASDASD'96 # config.local_conf['ofs.gs_secret_access_key'] = '134zsdfjkw4234addad'97 # need to ensure not configured for local as breaks google setup98 # (and cannot delete all ofs keys as need the gs access codes)99 if 'ofs.storage_dir' in config.local_conf:100 del config.local_conf['ofs.storage_dir']101 wsgiapp = make_app(config.global_conf, **config.local_conf)102 cls.app = paste.fixture.TestApp(wsgiapp)103 # setup test data including testsysadmin user104 CreateTestData.create()105 model.Session.remove()106 user = model.User.by_name('tester')107 cls.extra_environ = {'Authorization': str(user.apikey)}108 @classmethod109 def teardown_class(cls):110 CreateTestData.delete()111 def test_auth_form(self):112 url = url_for('storage_api_auth_form', label='abc')113 res = self.app.get(url, extra_environ=self.extra_environ, status=200)114 assert res.json['fields'][-1]['value'] == 'abc', res115 url = url_for('storage_api_auth_form', label='abc/xxx')116 res = self.app.get(url, extra_environ=self.extra_environ, status=200)117 assert res.json['fields'][-1]['value'] == 'abc/xxx'118 url = url_for('storage_api_auth_form', label='abc',119 success_action_redirect='abc')120 res = self.app.get(url, extra_environ=self.extra_environ, status=200)121 fields = dict([ (x['name'], x['value']) for x in res.json['fields'] ])122 assert fields['success_action_redirect'] == u'http://localhost/storage/upload/success_empty?label=abc'123 # TODO: re-enable124 # Disabling as there seems to be a mismatch between OFS and more recent125 # versions of boto (e.g. >= 2.1.1)126 # Specifically fill_in_auth method on Connection objects has gone away127 def _test_auth_request(self):128 url = url_for('storage_api_auth_request', label='abc')129 res = self.app.get(url, extra_environ=self.extra_environ, status=200)130 assert res.json['method'] == 'POST'...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!