Best Python code snippet using autotest_python
usdlParser.py
Source:usdlParser.py
...63 offerings_number = offerings_number + 164 if offerings_number != 1:65 msg = _('No service offering has been defined')66 raise Exception(msg)67 def _get_field(self, namespace, element, predicate, id_=False):68 result = []69 for e in self._graph.objects(element, namespace[predicate]):70 if not id_:71 result.append(unicode(e))72 else:73 #If id = True means that the uri will be used so it is necesary to return the class74 result.append(e)75 if len(result) == 0:76 result.append('')77 return result78 def _parse_basic_info(self, service_uri):79 count = 080 result = {}81 for t in self._graph.predicate_objects(service_uri):82 count = count + 183 if count < 3:84 result['part_ref'] = True85 return86 result['part_ref'] = False87 vendor = self._get_field(USDL, service_uri, 'hasProvider', id_=True)[0]88 result['vendor'] = self._get_field(FOAF, vendor, 'name')[0]89 # provider vCard90 vcard = self._get_field(VCARD, vendor, 'adr', id_=True)[0]91 if vcard != '':92 result['vcard'] = {93 'BEGIN': [{94 'properties': {},95 'value':'VCARD',96 }],97 'FN': [{98 'properties': {},99 'value': result['vendor']100 }],101 'ADR': [{102 'properties': {},103 'value': self._get_field(VCARD, vcard, 'street-address')[0] + ';' + self._get_field(VCARD, vcard, 'postal-code')[0] + ';' + self._get_field(VCARD, vcard, 'locality')[0] + ';' + self._get_field(VCARD, vcard, 'country-name')[0]104 }],105 'TEL': [{106 'properties': {},107 'value': self._get_field(VCARD, vcard, 'tel')[0]108 }],109 'EMAIL': [{110 'properties': {},111 'value': self._get_field(VCARD, vcard, 'email')[0]112 }],113 'END': [{114 'properties': {},115 'value': "VCARD"116 }]117 }118 result['name'] = self._get_field(DCTERMS, service_uri, 'title')[0]119 version = self._get_field(USDL, service_uri, 'versionInfo')[0]120 service_parts = self._get_field(USDL, service_uri, 'hasPartMandatory', id_=True)121 if len(service_parts) > 0:122 result['parts'] = []123 for part in service_parts:124 part_info = {}125 part_info['name'] = self._get_field(DCTERMS, part, 'title')[0]126 part_info['uri'] = unicode(part)127 result['parts'].append(part_info)128 result['short_description'] = self._get_field(DCTERMS, service_uri, 'abstract')[0]129 result['long_description'] = self._get_field(DCTERMS, service_uri, 'description')[0]130 result['created'] = self._get_field(DCTERMS, service_uri, 'created')[0]131 result['modified'] = self._get_field(DCTERMS, service_uri, 'modified')[0]132 result['uriImage'] = self._get_field(FOAF, service_uri, 'depiction')[0]133 result['version'] = version134 result['page'] = self._get_field(FOAF, service_uri, 'page')[0]135 return result136 def _parse_legal_info(self, service_uri):137 result = []138 legal_conditions = self._get_field(USDL, service_uri, 'hasLegalCondition', id_=True)139 # If legal does not exist the method does nothing140 if len(legal_conditions) == 1 and legal_conditions[0] == '':141 return []142 for legal in legal_conditions:143 legal_condition = {144 'type': self._get_field(RDF, legal, 'type')[0],145 'label': self._get_field(DCTERMS, legal, 'title')[0],146 'description': self._get_field(DCTERMS, legal, 'description')[0],147 'clauses': [],148 }149 clauses = self._get_field(LEGAL, legal, 'hasClause', id_=True)150 for c in clauses:151 clause = {}152 clause['name'] = self._get_field(LEGAL, c, 'name')[0]153 clause['text'] = self._get_field(LEGAL, c, 'text')[0]154 legal_condition['clauses'].append(clause)155 result.append(legal_condition)156 return result157 def _parse_sla_info(self, service_uri):158 result = []159 service_level_profile = self._get_field(USDL, service_uri, 'hasServiceLevelProfile', id_=True)[0]160 #If sla does not exist the method does nothing161 if service_level_profile != '':162 service_levels = self._get_field(SLA, service_level_profile, 'hasServiceLevel', id_=True)163 for sla in service_levels:164 service_level = {165 'type': self._get_field(RDF, sla, 'type')[0],166 'name': self._get_field(DCTERMS, sla, 'title')[0],167 'description': self._get_field(DCTERMS, sla, 'description')[0],168 'obligatedParty': self._get_field(SLA, sla, 'obligatedParty')[0],169 'slaExpresions': [],170 }171 sla_expresions = self._get_field(SLA, sla, 'serviceLevelExpression', id_=True)172 for exp in sla_expresions:173 expresion = {174 'name': self._get_field(DCTERMS, exp, 'title')[0],175 'description': self._get_field(DCTERMS, exp, 'description')[0],176 'variables': [],177 }178 variables = self._get_field(SLA, exp, 'hasVariable', id_=True)179 for var in variables:180 # The sla variables may defines service availibility or location, defined181 # by a point in utm coodinates plus a radius, in this case this property is182 # set in the sla expresion.183 radius = False184 default_value = self._get_field(SLA, var, 'hasDefault', id_=True)[0]185 if default_value == '':186 default_value = self._get_field(SLA, var, 'hasServiceRadius', id_=True)[0]187 if default_value != '':188 radius = True189 if 'location' not in expresion:190 expresion['location'] = {}191 type_ = self._get_field(RDF, default_value, 'type', id_=True)[0]192 # The sla variable may contains a location, i.e service availability193 if type_ == GR['QualitativeValue']:194 location = self._get_field(GEO, default_value, 'location', id_=True)[0]195 if location != '':196 if 'location' not in expresion:197 expresion['location'] = {}198 expresion['location']['coordinates'] = {199 'lat': self._get_field(GEO, location, 'lat')[0],200 'long': self._get_field(GEO, location, 'long')[0],201 }202 else:203 variable_info = {204 'label': self._get_field(RDFS, var, 'label')[0],205 'type': self._get_field(RDF, default_value, 'type')[0],206 'value': self._get_field(GR, default_value, 'hasValue')[0],207 'unit': self._get_field(GR, default_value, 'hasUnitOfMeasurement')[0],208 }209 expresion['variables'].append(variable_info)210 if radius:211 expresion['location']['radius'] = variable_info212 service_level['slaExpresions'].append(expresion)213 result.append(service_level)214 return result215 def _parse_interaction_protocols(self, service_uri):216 result = []217 # Get the interaction protocols that define the service functionality218 interaction_protocols = self._get_field(USDL, service_uri, 'hasInteractionProtocol', id_=True)219 if len(interaction_protocols) == 1 and interaction_protocols[0] == '':220 return []221 for itp in interaction_protocols:222 protocol_info = {223 'title': self._get_field(DCTERMS, itp, 'title')[0],224 'description': self._get_field(DCTERMS, itp, 'description')[0],225 'technical_interface': self._get_field(USDL, itp, 'hasTechnicalInterface')[0],226 'interactions': []227 }228 interactions = self._get_field(USDL, itp, 'hasInteraction', id_=True)229 for inte in interactions:230 interaction_info = {231 'title': self._get_field(DCTERMS, inte, 'title')[0],232 'description': self._get_field(DCTERMS, inte, 'description')[0],233 'interface_operation': self._get_field(USDL, inte, 'hasInterfaceOperation')[0],234 'inputs': [],235 'outputs': []236 }237 for input_ in self._get_field(USDL, inte, 'hasInput', id_=True):238 input_info = {239 'label': self._get_field(RDFS, input_, 'label')[0],240 'description': self._get_field(DCTERMS, input_, 'description')[0],241 'interface_element': self._get_field(USDL, input_, 'hasInterfaceElement')[0]242 }243 interaction_info['inputs'].append(input_info)244 for output in self._get_field(USDL, inte, 'hasOutput', id_=True):245 output_info = {246 'label': self._get_field(RDFS, output, 'label')[0],247 'description': self._get_field(DCTERMS, output, 'description')[0],248 'interface_element': self._get_field(USDL, output, 'hasInterfaceElement')[0]249 }250 interaction_info['outputs'].append(output_info)251 protocol_info['interactions'].append(interaction_info)252 result.append(protocol_info)253 return result254 def _parse_bind_expression(self, expression):255 result = {}256 operations = {257 unicode(SP['Sum']): '+',258 unicode(SP['Minus']): '-',259 unicode(SP['Mul']): '*',260 unicode(SP['Div']): '/'261 }262 # Check the operation263 exp_type = self._get_field(RDF, expression, 'type', id_=True)264 if exp_type[0] == '':265 raise Exception('Invalid price function: An expression must contain an operation per level')266 try:267 op = operations[unicode(exp_type[0])]268 except:269 raise Exception('Invalid price function: Invalid operation')270 # Get arguments value271 arg1 = self._get_field(SP, expression, 'arg1', id_=True)[0]272 arg2 = self._get_field(SP, expression, 'arg2', id_=True)[0]273 if self._get_field(RDF, arg1, 'type', id_=True)[0] == '':274 # Get variable name275 result['arg1'] = self._get_field(SP, arg1, 'varName')[0]276 else:277 # arg1 is an expression278 result['arg1'] = self._parse_bind_expression(arg1)279 if self._get_field(RDF, arg2, 'type', id_=True)[0] == '':280 # Get variable name281 result['arg2'] = self._get_field(SP, arg2, 'varName')[0]282 else:283 # arg2 is an expression284 result['arg2'] = self._parse_bind_expression(arg2)285 result['operation'] = op286 return result287 def _parse_function(self, price_function):288 result = {289 'label': self._get_field(RDFS, price_function, 'label')[0],290 'variables': {},291 'function': {}292 }293 variables = []294 # Pre-process price variables295 for variable in self._get_field(PRICE, price_function, 'hasVariable', id_=True):296 # Check variable type to know if it is a constant or an usage297 # variable298 var_label = self._get_field(RDFS, variable, 'label')[0]299 var_type = self._get_field(RDF, variable, 'type', id_=True)[0]300 if var_type == PRICE['Constant']:301 val_part = self._get_field(PRICE, variable, 'hasValue', id_=True)[0]302 const_value = self._get_field(GR, val_part, 'hasValueFloat')303 # Check that the value is valid304 if len(const_value) == 1 and const_value[0] == '':305 const_value = self._get_field(GR, val_part, 'hasValueInteger')306 if len(const_value) != 1 or const_value[0] == '':307 raise Exception('Invalid price function: Only a value is allowed for constants')308 variables.append({309 'type': 'constant',310 'id': variable,311 'label': var_label,312 'value': const_value[0]313 })314 elif var_type == PRICE['Usage']:315 variables.append({316 'type': 'usage',317 'id': variable,318 'label': var_label319 })320 else:321 raise Exception('Invalid price function: Invalid variable type')322 # Get body expression323 function_body = self._get_field(SPIN, price_function, 'body', id_=True)[0]324 body_type = self._get_field(RDF, function_body, 'type', id_=True)[0]325 if body_type != SP['Select']:326 raise Exception('Invalid price function: Invalid SPARQL method')327 # Check where clause328 bind_expression = None329 matching_exp = {}330 # Iterate over expressions list331 exp_list = self._get_field(SP, function_body, 'where', id_=True)[0]332 # First element333 first = self._get_field(RDF, exp_list, 'first', id_=True)[0]334 # Rest of the list335 rest = self._get_field(RDF, exp_list, 'rest', id_=True)[0]336 nil = False337 while not nil:338 # Get expression node339 exp = first340 # Check bind expression341 exp_type = self._get_field(RDF, exp, 'type', id_=True)342 if len(exp_type) > 0 and exp_type[0] == SP['Bind']:343 if bind_expression == None:344 bind_expression = exp345 else:346 raise Exception('Invalid price function: Only a bind expression is allowed')347 else:348 # Get expression tuple349 subject = self._get_field(SP, exp, 'subject', id_=True)[0]350 predicate = self._get_field(SP, exp, 'predicate', id_=True)[0]351 object_ = self._get_field(SP, exp, 'object', id_=True)[0]352 # Check if the expression defines an aux name353 if predicate == PRICE['hasValue']:354 variable_info = None355 found = False356 i = 0357 while not found and i < len(variables):358 if variables[i]['id'] == subject:359 found = True360 variable_info = variables[i]361 variable_info['type'] = variables[i]['type']362 i += 1363 if not found:364 raise Exception('Invalid price function: Variable not declared')365 # Get the aux name366 aux_name = self._get_field(SP, object_, 'varName')[0]367 if aux_name in matching_exp:368 # Check that is a variable name and not a369 # duplicity in the expression370 if 'variable_info' in matching_exp[aux_name]:371 raise Exception('Invalid price function: Duplicated expression')372 # The variable name used in the bind expression has373 # already been processed374 final_name = matching_exp[aux_name]['final_name']375 # The variable and its name are bound so it is possible to store376 # the variable info in the result structure377 result['variables'][final_name] = {378 'label': variable_info['label'],379 'type': variable_info['type']380 }381 if variable_info['type'] == 'constant':382 result['variables'][final_name]['value'] = variable_info['value']383 # Store variable info in matching expression in order to avoid duplicity384 matching_exp[aux_name]['variable_info'] = variable_info385 else:386 matching_exp[aux_name] = {387 'variable_info': variable_info388 }389 # Check if the expression defines a final name390 elif predicate == GR['hasValueFloat'] or predicate == GR['hasValueInteger']:391 aux_name = self._get_field(SP, subject, 'varName')[0]392 final_name = self._get_field(SP, object_, 'varName')[0]393 if aux_name in matching_exp:394 # Check that it contains variable info and395 # not a duplicated final name396 if 'final_name' in matching_exp[aux_name]:397 raise Exception('Invalid price function: Duplicated expression')398 # The variable has already been processed so it is possible399 # to store the variable info in the result structure400 result['variables'][final_name] = {401 'label': matching_exp[aux_name]['variable_info']['label'],402 'type': matching_exp[aux_name]['variable_info']['type']403 }404 if matching_exp[aux_name]['variable_info']['type'] == 'constant':405 result['variables'][final_name]['value'] = matching_exp[aux_name]['variable_info']['value']406 # Store the final name in matching expression to avoid duplicity407 matching_exp[aux_name]['final_name'] = final_name408 else:409 matching_exp[aux_name] = {410 'final_name': final_name411 }412 else:413 raise Exception('Invalid price function: Invalid predicate')414 # Check list pointer415 if rest == RDF['nil']:416 # There is not more elements417 nil = True418 else:419 # Update list pointer420 first = self._get_field(RDF, rest, 'first', id_=True)[0]421 rest = self._get_field(RDF, rest, 'rest', id_=True)[0]422 # Parse bind expression423 expression = self._get_field(SP, bind_expression, 'expression', id_=True)[0]424 # Parse the concrete function used to calculate the price425 result['function'] = self._parse_bind_expression(expression)426 # Build price function427 return result428 def _parse_pricing_info(self):429 result = {}430 # Parse offering info431 result['title'] = self._get_field(DCTERMS, self._offering_uri, 'title')[0]432 result['description'] = self._get_field(DCTERMS, self._offering_uri, 'description')[0]433 result['valid_from'] = self._get_field(USDL, self._offering_uri, 'validFrom')[0]434 result['valid_through'] = self._get_field(USDL, self._offering_uri, 'validThrough')[0]435 # Parse price plans info436 price_plans = self._get_field(USDL, self._offering_uri, 'hasPricePlan', id_=True)437 result['price_plans'] = []438 # Parse price plans439 for price in price_plans:440 price_plan = {}441 price_plan['title'] = self._get_field(DCTERMS, price, 'title')[0]442 price_plan['description'] = self._get_field(DCTERMS, price, 'description')[0]443 # Get the label of the price plan444 label = self._get_field(RDFS, price, 'label')445 if len(label) == 1:446 price_plan['label'] = label[0]447 elif len(label) > 1:448 raise Exception('Only a label is supported for the price plan')449 # Get price components450 price_components = self._get_field(PRICE, price, 'hasPriceComponent', id_=True)451 if len(price_components) > 1 or price_components[0] != '':452 # Initialize components and deductions453 price_plan['price_components'] = []454 price_plan['deductions'] = []455 for pc in price_components:456 # Get initial information457 price_component = {458 'title': self._get_field(DCTERMS, pc, 'title')[0],459 'description': self._get_field(DCTERMS, pc, 'description')[0]460 }461 # Check if it includes a price specification462 price_specification = self._get_field(PRICE, pc, 'hasPrice', id_=True)[0]463 function_found = False464 if price_specification != '':465 price_container = price_specification466 else:467 # Check if a price function exists468 price_function = self._get_field(PRICE, pc, 'hasPriceFunction', id_=True)[0]469 text_function = self._get_field(PRICE, pc, 'hasTextFunction')[0]470 if price_function != '' and text_function != '':471 function_found = True472 else:473 price_container = pc474 if function_found:475 # Save price function serialization476 price_component['text_function'] = text_function477 price_component['price_function'] = self._parse_function(price_function)478 else:479 # If not price function defined, get the price480 price_component['currency'] = self._get_field(GR, price_container, 'hasCurrency')[0]481 value = self._get_field(GR, price_container, 'hasCurrencyValue')[0]482 if not value:483 price_component['value'] = self._get_field(GR, price_container, 'hasValueFloat')[0]484 else:485 price_component['value'] = value486 price_component['unit'] = self._get_field(GR, price_container, 'hasUnitOfMeasurement')[0]487 # Check if it is a deduction getting component type488 component_type = self._get_field(RDF, pc, 'type', id_=True)[0]489 if component_type == PRICE['PriceComponent']:490 price_plan['price_components'].append(price_component)491 elif component_type == PRICE['Deduction']:492 price_plan['deductions'].append(price_component)493 taxes = self._get_field(PRICE, price, 'hasTax', id_=True)494 if len(taxes) > 1 or taxes[0] != '':495 price_plan['taxes'] = []496 for pc in taxes:497 tax = {498 'title': self._get_field(DCTERMS, pc, 'title')[0],499 'description': self._get_field(DCTERMS, pc, 'description')[0],500 'currency': self._get_field(GR, pc, 'hasCurrency')[0],501 }502 value = self._get_field(GR, pc, 'hasCurrencyValue')503 if not value:504 tax['value'] = value[0]505 else:506 tax['value'] = self._get_field(GR, pc, 'hasValueFloat')[0]507 tax['unit'] = self._get_field(GR, pc, 'hasUnitOfMeasurement')[0]508 price_plan['taxes'].append(tax)509 result['price_plans'].append(price_plan)510 return result511 def parse(self):512 result = {}513 result['pricing'] = self._parse_pricing_info()514 result['services_included'] = []515 for service_uri in self._get_field(USDL, self._offering_uri, 'includes', id_=True):516 if service_uri != '':517 basic_info = self._parse_basic_info(service_uri)518 if not basic_info['part_ref']:519 basic_info['legal'] = self._parse_legal_info(service_uri)520 basic_info['sla'] = self._parse_sla_info(service_uri)521 basic_info['interactions'] = self._parse_interaction_protocols(service_uri)522 del(basic_info['part_ref'])523 result['services_included'].append(basic_info)524 if not len(result['services_included']) > 0:525 raise Exception('No services included')526 return result527def validate_usdl(usdl, mimetype, offering_data):528 valid = True529 reason = ''...
exceptions.py
Source:exceptions.py
...57 pass58class Diagnostics(object):59 def __init__(self, exc):60 self._exc = exc61 def _get_field(self, field):62 from uxdbcffi._impl.adapters import bytes_to_ascii63 if self._exc and self._exc._pgres:64 b = libpq.PQresultErrorField(self._exc._pgres, field)65 if b:66 b = ffi.string(b)67 if six.PY3: # py2 tests insist on str here68 b = bytes_to_ascii(b)69 return b70 @property71 def severity(self):72 return self._get_field(libpq.LIBPQ_DIAG_SEVERITY)73 @property74 def sqlstate(self):75 return self._get_field(libpq.LIBPQ_DIAG_SQLSTATE)76 @property77 def message_primary(self):78 return self._get_field(libpq.LIBPQ_DIAG_MESSAGE_PRIMARY)79 @property80 def message_detail(self):81 return self._get_field(libpq.LIBPQ_DIAG_MESSAGE_DETAIL)82 @property83 def message_hint(self):84 return self._get_field(libpq.LIBPQ_DIAG_MESSAGE_HINT)85 @property86 def statement_position(self):87 return self._get_field(libpq.LIBPQ_DIAG_STATEMENT_POSITION)88 @property89 def internal_position(self):90 return self._get_field(libpq.LIBPQ_DIAG_INTERNAL_POSITION)91 @property92 def internal_query(self):93 return self._get_field(libpq.LIBPQ_DIAG_INTERNAL_QUERY)94 @property95 def context(self):96 return self._get_field(libpq.LIBPQ_DIAG_CONTEXT)97 @property98 def schema_name(self):99 return self._get_field(libpq.LIBPQ_DIAG_SCHEMA_NAME)100 @property101 def table_name(self):102 return self._get_field(libpq.LIBPQ_DIAG_TABLE_NAME)103 @property104 def column_name(self):105 return self._get_field(libpq.LIBPQ_DIAG_COLUMN_NAME)106 @property107 def datatype_name(self):108 return self._get_field(libpq.LIBPQ_DIAG_DATATYPE_NAME)109 @property110 def constraint_name(self):111 return self._get_field(libpq.LIBPQ_DIAG_CONSTRAINT_NAME)112 @property113 def source_file(self):114 return self._get_field(libpq.LIBPQ_DIAG_SOURCE_FILE)115 @property116 def source_line(self):117 return self._get_field(libpq.LIBPQ_DIAG_SOURCE_LINE)118 @property119 def source_function(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!