Best Python code snippet using selene_python
_api_model_builder.py
Source:_api_model_builder.py
1# ----------------------------------------------------------------2# Copyright 2016 Cisco Systems3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15# ------------------------------------------------------------------16"""17 api_model.py18 The meta-model of the generated API.19 Translation process converts the YANG model to classes defined in this module.20"""21from pyang import statements22from ._types_extractor import TypesExtractor23from ydkgen.common import YdkGenException24from ydkgen.api_model import AnyXml, Enum, Class, Bits, Package, Property, Deviation25class ApiModelBuilder(object):26 def __init__(self, iskeyword, language):27 self.types_extractor = TypesExtractor()28 self.iskeyword = iskeyword29 self.language = language30 def generate(self, modules):31 """32 Generates and returns the packages for the list of modules supplied.33 This is the function that converts the list of pyang modules to34 Packages in the api_model35 :param list of `pyang.statements.Statement`36 """37 d_modules = [module for module in modules if hasattr(module, 'is_deviation_module')]38 modules = [module for module in modules if not hasattr(module, 'is_deviation_module')]39 only_modules = [module for module in modules if module.keyword == 'module']40 packages = []41 deviation_packages = []42 for module in d_modules:43 package = Package(self.iskeyword)44 module.i_package = package45 package.stmt = module46 deviation_packages.append(package)47 for module in only_modules:48 package = Package(self.iskeyword)49 module.i_package = package50 package.stmt = module51 self._create_expanded_api_model(module, package, deviation_packages)52 packages.append(package)53 packages.extend(deviation_packages)54 # resolve cross references55 for package in packages:56 self._resolve_expanded_cross_references(package)57 return packages58 def _add_enums_and_bits(self, s, pe):59 enum_type_stmt = self.types_extractor.get_enum_type_stmt(s)60 bits_type_stmt = self.types_extractor.get_bits_type_stmt(s)61 union_type_stmt = self.types_extractor.get_union_type_stmt(s)62 if enum_type_stmt is not None:63 enum_class = Enum(self.iskeyword)64 enum_class.stmt = enum_type_stmt65 enum_type_stmt.parent.i_enum = enum_class66 enum_type_stmt.i_enum = enum_class67 pe.owned_elements.append(enum_class)68 enum_class.owner = pe69 if bits_type_stmt is not None:70 bits_class = Bits(self.iskeyword)71 bits_class.stmt = bits_type_stmt72 bits_type_stmt.parent.i_bits = bits_class73 bits_type_stmt.i_bits = bits_class74 pe.owned_elements.append(bits_class)75 bits_class.owner = pe76 if union_type_stmt is not None:77 # need to process the type stmts under the union78 for contained_type in union_type_stmt.i_type_spec.types:79 self._add_enums_and_bits(contained_type, pe)80 def _resolve_expanded_cross_references(self, element):81 """ Resolves cross references in the api_model Elements.82 Prerequisite before calling this function is that83 the 'pyang.statements.Statement' tree for the list of modules84 must have their associated api_model Elements set in the i_class85 i_package, i_enum variables. These variables are used to resolve86 the cross references.87 :param `NamedElement` element :- The element whose references need to be resolved.88 :raise `common.YdkGenException` If cross resolution failed.89 """90 if isinstance(element, Property):91 enum_type = self.types_extractor.get_enum_type_stmt(element.stmt)92 if enum_type is not None and not isinstance(element.property_type, Enum):93 if not hasattr(enum_type.parent, 'i_enum'):94 # case where the type is a leafref pointing to a leaf with an95 # embedded enum definition96 if (hasattr(enum_type.parent, 'i_property') and97 isinstance(enum_type.parent.i_property.property_type, Enum)):98 element.property_type = enum_type.parent.i_property.property_type99 else:100 raise YdkGenException(101 'Cross resolution of enum failed for ' + element.fqn())102 else:103 element.property_type = enum_type.parent.i_enum104 else:105 # check for identity_ref's106 identity_ref_type = self.types_extractor.get_identity_ref_type_stmt(element.stmt)107 if identity_ref_type is not None:108 if not hasattr(identity_ref_type.i_type_spec.base.i_identity, 'i_class'):109 raise YdkGenException(110 'Cross resolution of identity class failed for ' + element.fqn())111 element.property_type = identity_ref_type.i_type_spec.base.i_identity.i_class112 else:113 # check for bits114 bits_ref_type = self.types_extractor.get_bits_type_stmt(element.stmt)115 if bits_ref_type is not None and not isinstance(element.property_type, Bits):116 if not hasattr(bits_ref_type.parent, 'i_bits'):117 raise YdkGenException(118 'Cross resolution of bits failed for ' + element.fqn())119 element.property_type = bits_ref_type.parent.i_bits120 if hasattr(element, 'owned_elements'):121 for owned_element in element.owned_elements:122 self._resolve_expanded_cross_references(owned_element)123 def _add_to_deviation_package(self, target, parent_element, deviation_packages):124 i_deviation = target.i_deviation125 for d_type in i_deviation:126 d_obj = Deviation(self.iskeyword)127 d_obj.d_type = d_type128 d_obj.d_target = target129 d_obj.owner = parent_element130 d_obj.d_stmts = set()131 for (d_module, d_stmt) in i_deviation[d_type]:132 d_module_name = d_module.arg133 target_package = [p for p in deviation_packages if p.stmt.arg == d_module_name][0]134 d_obj.d_stmts.add(d_stmt)135 if d_stmt.keyword == 'type':136 d_obj.d_target = target.copy()137 d_target = d_obj.d_target138 idx = d_target.substmts.index(d_target.search_one('type'))139 d_target.substmts[idx] = d_stmt140 self._add_leaf_leaflist_prop(d_target, d_obj)141 if d_obj not in target_package.owned_elements:142 target_package.owned_elements.append(d_obj)143 def _add_leaf_leaflist_prop(self, stmt, parent_element):144 prop = Property(self.iskeyword)145 stmt.i_property = prop146 prop.stmt = stmt147 for element in parent_element.owned_elements:148 if element.name == prop.name:149 prop.name = prop.name + '_'150 parent_element.owned_elements.append(prop)151 prop.owner = parent_element152 # for inlined enum types where leaf { type enumeration {153 enum_type = self.types_extractor.get_enum_type_stmt(stmt)154 bits_type = self.types_extractor.get_bits_type_stmt(stmt)155 union_type = self.types_extractor.get_union_type_stmt(stmt)156 # if the type statement is totally self contained157 # then we need to extract this type158 if stmt.keyword == 'anyxml':159 anyxml = AnyXml()160 anyxml.stmt = stmt161 # parent_element.owned_elements.append(anyxml)162 # anyxml.owner = parent_element163 prop.property_type = anyxml164 elif enum_type is not None and enum_type == stmt.search_one('type'):165 # we have to create the enum166 enum_class = Enum(self.iskeyword)167 enum_class.stmt = enum_type168 parent_element.owned_elements.append(enum_class)169 enum_class.owner = parent_element170 prop.property_type = enum_class171 enum_type.i_enum = enum_class172 enum_type.parent.i_enum = enum_class173 elif bits_type is not None and bits_type == stmt.search_one('type'):174 # we have to create the specific subclass of FixedBitsDict175 bits_class = Bits(self.iskeyword)176 bits_class.stmt = bits_type177 parent_element.owned_elements.append(bits_class)178 bits_class.owner = parent_element179 prop.property_type = bits_class180 elif union_type is not None and union_type == stmt.search_one('type'):181 def _add_union_type(union_type_stmt, parent_element):182 for contained_type in union_type_stmt.i_type_spec.types:183 contained_enum_type = self.types_extractor.get_enum_type_stmt(contained_type)184 contained_bits_type = self.types_extractor.get_bits_type_stmt(contained_type)185 contained_union_type = self.types_extractor.get_union_type_stmt(contained_type)186 if contained_enum_type is not None and contained_enum_type == contained_type:187 enum_class = Enum(self.iskeyword)188 enum_class.stmt = contained_enum_type189 parent_element.owned_elements.append(enum_class)190 enum_class.owner = parent_element191 contained_enum_type.i_enum = enum_class192 contained_enum_type.parent.i_enum = enum_class193 if contained_bits_type is not None and contained_bits_type == contained_type:194 bits_class = Bits(self.iskeyword)195 bits_class.stmt = contained_bits_type196 parent_element.owned_elements.append(bits_class)197 bits_class.owner = parent_element198 contained_bits_type.i_bits = bits_class199 if contained_union_type is not None and contained_union_type == contained_type:200 _add_union_type(contained_union_type, parent_element)201 # is this embedded ?202 if union_type == stmt.search_one('type'):203 # we need to check for the types under the union to see if204 # any of them need to be handled differently205 _add_union_type(union_type, parent_element)206 def _create_expanded_api_model(self, stmt, parent_element, deviation_packages):207 """208 Converts the stmt to an Element in the api_model according209 to the expanded code generation algorithm.210 The expanded code generation algorithm uses the tree view of the YANG models211 to generate the API. For each data node in the YANG model a corresponding Class212 element will be created.213 In the first pass Elements that encapsulate the references are created214 this is done for all the stmts we are interested215 after this is done, resolve cross references is called on all the elements216 to resolve all the cross references (examples include217 super classes extends field in a class)218 :param `pyang.statements.Statement` stmt The statement to convert219 :param `Element` The parent element.220 :param list of 'Package' The deviation packages.221 """222 # process typedefs first so that they are resolved223 # when we have to use them224 element = parent_element225 # identities226 if hasattr(stmt, 'i_identities'):227 for identity_stmt in stmt.i_identities.values():228 identity_class = Class(self.iskeyword)229 identity_class.stmt = identity_stmt230 identity_class.owner = parent_element231 parent_element.owned_elements.append(identity_class)232 identity_stmt.i_class = identity_class233 if hasattr(stmt, 'i_typedefs'):234 for typedef_stmt_name in stmt.i_typedefs:235 typedef_stmt = stmt.i_typedefs[typedef_stmt_name]236 self._add_enums_and_bits(typedef_stmt, parent_element)237 if stmt.keyword == 'module':238 pass239 elif stmt.keyword in ('container', 'list', 'rpc', 'input', 'output'):240 if stmt.keyword in ('input', 'output') and len(stmt.substmts) == 0:241 pass242 else:243 clazz = Class(self.iskeyword)244 stmt.i_class = clazz245 clazz.stmt = stmt246 for e in parent_element.owned_elements:247 if e.name == clazz.name:248 clazz.name = clazz.name + '_'249 if hasattr(stmt, 'unclashed_arg'):250 stmt.unclashed_arg += '_'251 else:252 stmt.unclashed_arg = '%s_' % stmt.arg253 parent_element.owned_elements.append(clazz)254 clazz.owner = parent_element255 if name_matches_ancestor(clazz.name, parent_element):256 clazz.name = clazz.name + '_'257 element = clazz258 if not isinstance(parent_element, Package):259 # create a property along with the class260 prop = Property(self.iskeyword)261 stmt.i_property = prop262 prop.stmt = stmt263 prop.property_type = clazz264 parent_element.owned_elements.append(prop)265 prop.owner = parent_element266 elif stmt.keyword == 'leaf' or stmt.keyword == 'leaf-list' or stmt.keyword == 'anyxml':267 self._add_leaf_leaflist_prop(stmt, parent_element)268 if hasattr(stmt, 'i_deviation'):269 self._add_to_deviation_package(stmt, parent_element, deviation_packages)270 # walk the children271 _keywords = statements.data_definition_keywords + ['case', 'rpc', 'input', 'output']272 if hasattr(stmt, 'i_children'):273 self._sanitize_namespace(stmt)274 child_stmts=[]275 if hasattr(stmt, 'i_key') and stmt.i_key is not None:276 child_stmts.extend([s for s in stmt.i_key])277 if stmt.keyword == 'rpc' and self.language == 'cpp':278 child_stmts = self._walk_children(stmt, _keywords)279 else:280 _children = [child for child in stmt.i_children \281 if (child not in child_stmts and child.keyword in _keywords)]282 child_stmts.extend(_children)283 for child_stmt in child_stmts:284 self._create_expanded_api_model(child_stmt, element, deviation_packages)285 # assumes stmt has attribute 'i_children' and language is 'cpp'286 def _walk_children(self, stmt, keywords):287 children = []288 if hasattr(stmt, 'i_key') and stmt.i_key is not None:289 children.extend([s for s in stmt.i_key if s not in children])290 for child in stmt.i_children:291 if child not in children and child.keyword in keywords:292 children.append(child)293 return children294 def _sanitize_namespace(self, stmt):295 """296 Detects if there is a name clash for a statement297 under a given node.298 Note in the expanded tree algorithm299 augments show up under the parent node and their namespaces300 are not taken into account when the corresponding api_model.Element301 is generated. By sanitizing the namespace we avoid name collisions302 that can occur because of this situations303 :param `pyang.statements.Statement` The stmt to sanitize.304 """305 def _get_num_clashes(i_children):306 stmts = []307 for stmt in i_children:308 if stmt.keyword in ['container', 'list', 'leaf', 'leaf-list']:309 stmts.append(stmt)310 elif stmt.keyword == 'choice':311 for choice_child in stmt.i_children:312 if choice_child.keyword == 'case':313 if len(choice_child.i_children) > 0:314 choice_child = choice_child.i_children[0]315 if choice_child.keyword in ['container', 'list', 'leaf', 'leaf-list']:316 stmts.append(choice_child)317 all_stmts = [stmt for stmt in stmts]318 clashes = []319 while len(stmts) > 0:320 ss = stmts.pop()321 sargs = [s.arg for s in stmts]322 if ss.arg in sargs and ss.arg not in clashes:323 clashes.append(ss.arg)324 return clashes, all_stmts325 def _kill_clash(clash, i_children):326 for stmt in i_children:327 if clash == stmt.arg:328 if hasattr(stmt, 'i_augment'):329 stmt.unclashed_arg = '%s_%s' % (stmt.top.arg, stmt.arg)330 elif stmt.parent.keyword == 'case' and hasattr(stmt.parent.parent, 'i_augment'):331 stmt.unclashed_arg = '%s_%s' % (stmt.parent.parent.top.arg, stmt.arg)332 clashes, stmts = _get_num_clashes(stmt.i_children)333 if len(clashes) > 0:334 for clash in clashes:335 _kill_clash(clash, stmts)336class GroupingClassApiModelBuilder(ApiModelBuilder):337 def generate(self, modules):338 """339 Generates and returns the packages for the list of modules supplied.340 This is the function that converts the list of pyang modules to341 Packages in the api_model for the grouping as classes strategy.342 :param list of `pyang.statements.Statement`343 """344 only_modules = [m for m in modules if m.keyword == 'module']345 packages = []346 for m in only_modules:347 p = Package(self.iskeyword)348 m.i_package = p349 p.stmt = m350 self._create_grouping_class_api_model(m, p)351 packages.append(p)352 # resolve cross references353 for p in packages:354 self._resolve_grouping_class_cross_references(p)355 return packages356 def _resolve_grouping_class_cross_references(self, element):357 """358 Resolve cross references in the grouping as class code generation359 strategy.360 :param `api_model.Element` The model element whose cross references have to be361 resolved.362 :raise `common.YdkGenException' if cross resolution of references failed.363 """364 if isinstance(element, Class) and not element.is_identity():365 uses_stmts = element.stmt.search('uses')366 # set the super classes or the extend property367 groupings_used = []368 for uses in uses_stmts:369 groupings_used.append(uses.i_grouping)370 extends = []371 for grouping_stmt in groupings_used:372 if grouping_stmt.i_class is None:373 raise YdkGenException(374 'Unresolved grouping class for ' + element.fqn())375 extends.append(grouping_stmt.i_class)376 element._extends = extends377 if isinstance(element, Property):378 enum_type = self.types_extractor.get_enum_type_stmt(element.stmt)379 if enum_type is not None and not isinstance(element.property_type, Enum):380 if not hasattr(enum_type.parent, 'i_enum'):381 raise YdkGenException(382 'Cross resolution of enum failed for ' + element.fqn())383 element.property_type = enum_type.parent.i_enum384 else:385 # check for identity_ref's386 identity_ref_type = self.types_extractor.get_identity_ref_type_stmt(element.stmt)387 if identity_ref_type is not None:388 element.property_type = identity_ref_type.i_type_spec.base.i_identity.i_class389 else:390 # check for bits391 bits_ref_type = self.types_extractor.get_bits_type_stmt(element.stmt)392 if bits_ref_type is not None and not isinstance(element.property_type, Bits):393 if not hasattr(bits_ref_type.parent, 'i_bits'):394 raise YdkGenException(395 'Cross resolution of bits failed for ' + element.fqn())396 element.property_type = bits_ref_type.i_bits397 if hasattr(element, 'owned_elements'):398 for owned_element in element.owned_elements:399 self._resolve_grouping_class_cross_references(owned_element)400 def _create_grouping_class_api_model(self, stmt, parent_element):401 """402 Converts the stmt to an Element in the api_model according403 to the grouping as class algorithm.404 In the grouping as class code generations strategy a grouping in YANG405 is converted to a class. Every class that represents a container or a list406 or a grouping that has a uses statement in it , inherits from the grouping class that407 corresponds to the grouping in the uses statement.408 for example409 grouping abc { class Abc_Grouping(object):...410 ....411 }412 container A { class A(Abc_Grouping): ...413 uses abc;414 }415 In the first pass Elements that encapsulate the references are created416 this is done for all the stmts we are interested417 after this is done, resolve cross references is called on all the elements418 to resolve all the cross references (examples include419 super classes extends field in a class)420 :param `pyang.statements.Statement` stmt The statement to convert421 :param `Element` The parent element.422 """423 # process typedefs first so that they are resolved424 # when we have to use them425 element = parent_element426 # identities427 if hasattr(stmt, 'i_identities'):428 for identity_stmt in stmt.i_identities.values():429 identity_class = Class(self.iskeyword)430 identity_class.stmt = identity_stmt431 identity_class.owner = parent_element432 parent_element.owned_elements.append(identity_class)433 identity_stmt.i_class = identity_class434 if hasattr(stmt, 'i_typedefs'):435 for typedef_stmt_name in stmt.i_typedefs:436 typedef_stmt = stmt.i_typedefs[typedef_stmt_name]437 self._add_enums_and_bits(typedef_stmt, parent_element)438 # walk the groupings first439 if hasattr(stmt, 'i_groupings'):440 for grouping_name in stmt.i_groupings:441 self._create_grouping_class_api_model(442 stmt.i_groupings[grouping_name], element)443 if stmt.keyword == 'grouping':444 clazz = Class(self.iskeyword)445 stmt.i_class = clazz446 clazz.stmt = stmt447 parent_element.owned_elements.append(clazz)448 clazz.owner = parent_element449 element = clazz450 elif stmt.keyword == 'container' or stmt.keyword == 'list':451 clazz = Class(self.iskeyword)452 stmt.i_class = clazz453 clazz.stmt = stmt454 parent_element.owned_elements.append(clazz)455 clazz.owner = parent_element456 element = clazz457 if not isinstance(parent_element, Package):458 # create a property along with the class459 prop = Property(self.iskeyword)460 stmt.i_property = prop461 prop.stmt = stmt462 prop.property_type = clazz463 parent_element.owned_elements.append(prop)464 prop.owner = parent_element465 elif stmt.keyword == 'leaf' or stmt.keyword == 'leaf-list':466 prop = Property(self.iskeyword)467 stmt.i_property = prop468 prop.stmt = stmt469 parent_element.owned_elements.append(prop)470 prop.owner = parent_element471 # for inlined enum types where leaf { type enumeration {472 enum_type = self.types_extractor.get_enum_type_stmt(stmt)473 bits_type = self.types_extractor.get_bits_type_stmt(stmt)474 if enum_type is not None:475 if enum_type == stmt.search_one('type'):476 # we have to create the enum477 enum_class = Enum(self.iskeyword)478 enum_class.stmt = enum_type479 parent_element.owned_elements.append(enum_class)480 enum_class.owner = parent_element481 prop.property_type = enum_class482 enum_type.parent.i_enum = enum_class483 enum_type.i_enum = enum_class484 elif bits_type is not None:485 if bits_type == stmt.search_one('type'):486 # we have to create the specific subclass of FixedBitsDict487 bits_class = Bits(self.iskeyword)488 bits_class.stmt = bits_type489 parent_element.owned_elements.append(bits_class)490 bits_class.owner = parent_element491 prop.property_type = bits_class492 # walk the children493 if hasattr(stmt, 'i_children'):494 grouping_stmt_names = []495 if stmt.keyword != 'module':496 uses_stmts = stmt.search('uses')497 groupings_used = []498 for uses_stmt in uses_stmts:499 groupings_used.append(uses_stmt.i_grouping)500 for grouping in groupings_used:501 grouping_stmt_names.extend(502 [s.arg for s in grouping.i_children])503 chs = [ch for ch in stmt.i_children504 if (ch.keyword in statements.data_definition_keywords and505 ch.arg not in grouping_stmt_names)]506 for child_stmt in chs:507 self._create_grouping_class_api_model(child_stmt, element)508class SubModuleBuilder(object):509 def generate(self, submodules, iskeyword):510 packages = []511 for sub in submodules:512 package = Package(iskeyword)513 sub.i_package = package514 package.stmt = sub515 packages.append(package)516 return packages517def name_matches_ancestor(name, parent_element):518 if parent_element is None or isinstance(parent_element, Package):519 return False520 if name == parent_element.name:521 return True522 if not hasattr(parent_element, 'owner'):523 return False...
xsd_restriction.py
Source:xsd_restriction.py
1# Distributed under the GPLv2 License; see accompanying file COPYING.2from dumco.utils.decorators import method_once3import dumco.schema.checks4import dumco.schema.model5import dumco.schema.uses6import base7import xsd_enumeration8import xsd_simple_type9def xsd_restriction(attrs, parent_element, factory, schema_path, all_schemata):10 new_element = XsdRestriction(attrs, all_schemata[schema_path])11 parent_element.children.append(new_element)12 return (new_element, {13 'annotation': factory.noop_handler,14 'enumeration': xsd_enumeration.xsd_enumeration,15 'fractionDigits': new_element.xsd_fractionDigits,16 'length': new_element.xsd_length,17 'maxExclusive': new_element.xsd_maxExclusive,18 'maxInclusive': new_element.xsd_maxInclusive,19 'maxLength': new_element.xsd_maxLength,20 'minExclusive': new_element.xsd_minExclusive,21 'minInclusive': new_element.xsd_minInclusive,22 'minLength': new_element.xsd_minLength,23 'pattern': new_element.xsd_pattern,24 'simpleType': xsd_simple_type.xsd_simpleType,25 'totalDigits': new_element.xsd_totalDigits,26 'whiteSpace': new_element.xsd_whiteSpace,27 })28class XsdRestriction(base.XsdBase):29 def __init__(self, attrs, parent_schema):30 super(XsdRestriction, self).__init__(attrs)31 self.schema = parent_schema32 self.schema_element = dumco.schema.model.Restriction()33 @method_once34 def finalize(self, factory):35 base = None36 if self.attr('base') is None:37 for t in self.children:38 assert ((isinstance(t, xsd_simple_type.XsdSimpleType) or39 isinstance(t, xsd_enumeration.XsdEnumeration)) and40 base is None), \41 'Wrong content of Restriction'42 if isinstance(t, xsd_simple_type.XsdSimpleType):43 base = t.finalize(factory)44 elif isinstance(t, xsd_enumeration.XsdEnumeration):45 enum = dumco.schema.model.EnumerationValue(46 t.value, t.schema_element.doc)47 self.schema_element.enumerations.append(enum)48 else:49 base = factory.resolve_simple_type(self.attr('base'), self.schema)50 for x in self.children:51 assert isinstance(x, xsd_enumeration.XsdEnumeration), \52 'Expected only Enumerations'53 enum = dumco.schema.model.EnumerationValue(54 x.value, x.schema_element.doc)55 self.schema_element.enumerations.append(enum)56 assert base is not None, 'Restriction does not have base type'57 return self.connect_restriction_base(base)58 def connect_restriction_base(self, base):59 if dumco.schema.checks.is_list_type(base):60 simplify_list_restiction(base, self.schema_element.length,61 self.schema_element.min_length,62 self.schema_element.max_length)63 return base64 self.schema_element.base = self.merge_base_restriction(base)65 return self.schema_element66 def merge_base_restriction(self, base):67 def merge(attr):68 if not getattr(self.schema_element, attr):69 value = getattr(base.restriction, attr)70 if not value:71 setattr(self.schema_element, attr, value)72 if dumco.schema.checks.is_restriction_type(base):73 merge('enumerations')74 merge('fraction_digits')75 merge('length')76 merge('max_exclusive')77 merge('max_inclusive')78 merge('max_length')79 merge('min_exclusive')80 merge('min_inclusive')81 merge('min_length')82 merge('patterns')83 merge('total_digits')84 merge('white_space')85 base = self.merge_base_restriction(base.restriction.base)86 return base87 @staticmethod88 def _value_handler(fieldname, attrs, parent_element, factory,89 schema_path, all_schemata):90 assert hasattr(parent_element.schema_element, fieldname)91 setattr(parent_element.schema_element, fieldname,92 factory.get_attribute(attrs, 'value'))93 return (parent_element, {94 'annotation': factory.noop_handler,95 })96 @staticmethod97 def xsd_fractionDigits(attrs, parent_element, factory,98 schema_path, all_schemata):99 return XsdRestriction._value_handler(100 'fraction_digits', attrs, parent_element,101 factory, schema_path, all_schemata)102 @staticmethod103 def xsd_length(attrs, parent_element, factory,104 schema_path, all_schemata):105 return XsdRestriction._value_handler(106 'length', attrs, parent_element,107 factory, schema_path, all_schemata)108 @staticmethod109 def xsd_maxExclusive(attrs, parent_element, factory,110 schema_path, all_schemata):111 return XsdRestriction._value_handler(112 'max_exclusive', attrs, parent_element,113 factory, schema_path, all_schemata)114 @staticmethod115 def xsd_maxInclusive(attrs, parent_element, factory,116 schema_path, all_schemata):117 return XsdRestriction._value_handler(118 'max_inclusive', attrs, parent_element,119 factory, schema_path, all_schemata)120 @staticmethod121 def xsd_maxLength(attrs, parent_element, factory,122 schema_path, all_schemata):123 return XsdRestriction._value_handler(124 'max_length', attrs, parent_element,125 factory, schema_path, all_schemata)126 @staticmethod127 def xsd_minExclusive(attrs, parent_element, factory,128 schema_path, all_schemata):129 return XsdRestriction._value_handler(130 'min_exclusive', attrs, parent_element,131 factory, schema_path, all_schemata)132 @staticmethod133 def xsd_minInclusive(attrs, parent_element, factory,134 schema_path, all_schemata):135 return XsdRestriction._value_handler(136 'min_inclusive', attrs, parent_element,137 factory, schema_path, all_schemata)138 @staticmethod139 def xsd_minLength(attrs, parent_element, factory,140 schema_path, all_schemata):141 return XsdRestriction._value_handler(142 'min_length', attrs, parent_element,143 factory, schema_path, all_schemata)144 @staticmethod145 def xsd_pattern(attrs, parent_element, factory,146 schema_path, all_schemata):147 parent_element.schema_element.patterns.append(148 factory.get_attribute(attrs, 'value'))149 return (parent_element, {150 'annotation': factory.noop_handler,151 })152 @staticmethod153 def xsd_totalDigits(attrs, parent_element, factory,154 schema_path, all_schemata):155 return XsdRestriction._value_handler(156 'total_digits', attrs, parent_element,157 factory, schema_path, all_schemata)158 @staticmethod159 def xsd_whiteSpace(attrs, parent_element, factory,160 schema_path, all_schemata):161 assert hasattr(parent_element.schema_element, 'white_space')162 value = factory.get_attribute(attrs, 'value')163 if value == 'preserve':164 parent_element.schema_element.white_space = \165 dumco.schema.model.Restriction.WS_PRESERVE166 elif value == 'replace':167 parent_element.schema_element.white_space = \168 dumco.schema.model.Restriction.WS_REPLACE169 elif value == 'collapse':170 parent_element.schema_element.white_space = \171 dumco.schema.model.Restriction.WS_COLLAPSE172 else: # pragma: no cover173 assert False, 'Unknown token for whiteSpace'174 return (parent_element, {175 'annotation': factory.noop_handler,176 })177def simplify_list_restiction(base, length, min_length, max_length):178 assert dumco.schema.checks.is_list_type(base)179 itemtype = base.listitems[0]180 min_occurs = itemtype.min_occurs181 max_occurs = itemtype.max_occurs182 if length is not None:183 min_occurs = length184 max_occurs = length185 if min_length is not None:186 min_occurs = min_length187 if max_length is not None:188 max_occurs = max_length189 base.listitems[0] = dumco.schema.uses.ListTypeCardinality(...
section_importer.py
Source:section_importer.py
1from cdautils.models import Entry2from cdautils.cda.narrative_reference_handler import NarrativeReferenceHandler3from cdautils.cda.utils import timestamp_to_integer4class SectionImporter(object):5 """ Parent Section Importer. Provides several utility functions for various data elements in the CCDA6 """7 def __init__(self, entry_finder):8 9 self.entry_finder = entry_finder10 self.code_xpath = './cda:code'11 self.status_xpath = './cda:statusCode' 12 self.priority_xpath = None13 self.description_xpath = "./cda:code/cda:originalText/cda:reference[@value] | ./cda:text/cda:reference[@value]"14 self.check_for_usable = True15 self.entry_class = Entry16 17 def create_enteries(self, doc, nrh = NarrativeReferenceHandler()):18 entry_list = []19 entry_elements = self.entry_finder.entries(doc)20 if entry_elements:21 for entry_element in entry_elements:22 entry = self.create_entry(entry_element, nrh)23 def create_entry(self, entry_element, nrh = NarrativeReferenceHandler()):24 entry = self.entry_class()25 entry.save()26 self.extract_codes(entry_element, entry)27 self.extract_dates(entry_element, entry)28 self.extract_value(entry_element, entry)29 ctext = entry_element.xpath("./cda:text")30 if ctext:31 entry.free_text = ctext[0].text32 if self.status_xpath:33 self.extract_status(entry_element, entry)34 if self.description_xpath:35 self.extract_description(entry_element, entry, nrh)36 return entry37 def extract_status(self, parent_element, entry):38 """ Extract status codes """39 status_element = parent_element.xpath(self.status_xpath)40 if status_element:41 status_element = status_element[0]42 entry.status_code = status_element.get('code')43 print entry.status_code44 def extract_description(self, parent_element, entry, nrh):45 """ Extract description """46 code_elements = parent_element.xpath(self.description_xpath)47 for code_element in code_elements:48 tag = code_element.get('value')49 print "Description:"+tag50 entry.description = nrh.lookup_tag(tag)51 print entry.description52 def extract_codes(self, parent_element, entry):53 code_elements = parent_element.xpath(self.code_xpath)54 for code_element in code_elements:55 self.add_code_if_present(code_element, entry)56 translations = code_element.xpath('cda:translation')57 for translation in translations:58 self.add_code_if_present(translation, entry)59 def extract_code(self, parent_element, code_xpath, code_system=None):60 """ Get the code and code system """61 code_element = parent_element.xpath(code_xpath)62 if code_element:63 code_element = code_element[0]64 code_hash = {'code':code_element.get('code')}65 if code_system:66 code_hash['codeSystem'] = code_system67 else:68 code_hash['codeSystemOid'] = code_element.get('codeSystem')69 code_hash['codeSystem'] = code_element.get('codeSystemName') #TODO: RETURN GET SYSTEM70 return code_hash71 def add_code_if_present(self, code_element, entry):72 if code_element.get('codeSystem') and code_element.get('code'): 73 entry.add_code(code_element.get('codeSystem'), code_element.get('code')) 74 def extract_dates(self, parent_element, entry, element_name="effectiveTime"):75 if parent_element.xpath("cda:"+element_name+"/@value"):76 entry.time = timestamp_to_integer(parent_element.xpath("cda:"+element_name+"/@value")[0].get('value'))77 if parent_element.xpath("cda:"+element_name+"/cda:low"):78 entry.start_time = timestamp_to_integer(parent_element.xpath("cda:"+element_name+"/cda:low")[0].get('value'))79 if parent_element.xpath("cda:"+element_name+"/cda:high"):80 entry.end_time = timestamp_to_integer(parent_element.xpath("cda:"+element_name+"/cda:high")[0].get('value'))81 if parent_element.xpath("cda:"+element_name+"/cda:center"):82 entry.time = timestamp_to_integer(parent_element.xpath("cda:"+element_name+"/cda:center")[0].get('value'))83 def extract_value(self, parent_element, entry):84 85 value_element = parent_element.xpath("cda:value")86 if value_element:87 if isinstance(value_element,list):88 value_element = value_element[0]89 value = value_element.get('value')90 unit = value_element.get('unit')91 print value, unit92 if value:93 entry.set_value(value.strip(), unit)94 #TODO: implement negation extraction95 def extract_negation(self, parent_element, entry):96 pass97 def extract_scalar(self, parent_element, scalar_xpath): 98 scalar_element = parent_element.xpath(scalar_xpath)99 if scalar_element: 100 scalar_element = scalar_element[0]101 return {'unit':scalar_element.get('unit'),102 'value':scalar_element.get('value')}103 else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!