Best Python code snippet using refurb_python
make_rda_mappings.py
Source:make_rda_mappings.py
...37 g2.namespace_manager.bind('rdacarrier', RDACarrierType)38 g2.namespace_manager.bind('rdaissuance', RDAIssuanceType)39 g2.namespace_manager.bind('kbrda', KBRDA)40 g2.namespace_manager.bind('kbv', KBV)41 _make_mappings(g2, RDAContentType, LCContentType, KBV.ContentType)42 _make_mappings(g2, RDAMediaType, LCMediaType, KBV.MediaType)43 carrier_g = _make_mappings(g2, RDACarrierType, LCCarrierType, KBV.CarrierType)44 hack_media_id_by_chopped_top_carrier = {}45 for top in carrier_g.objects(RDACarrierType, SKOS.hasTopConcept):46 for top_id in g2.subjects(OWL.sameAs, top):47 break48 assert unicode(top_id).endswith('Carriers(Deprecated)'), top_id49 g2.remove((top_id, None, None))50 # Redundant now; see comment just below.51 media_id = URIRef(unicode(top_id).replace('Carriers(Deprecated)', ''))52 if unicode(media_id).endswith('ProjectedImage') and (media_id, None, None) not in g2:53 media_id = URIRef(unicode(media_id).replace('ProjectedImage', 'Projected'))54 #g2.add((top_id, OWL.sameAs, media_id))55 # NOTE: Before deprecation in RDA, it was possible to follow broader links56 # from specific carriers to base carrier terms which were equatable to one57 # of the media terms. These broader links are now missing. The loop below58 # thus doesn't execute anymore (no broader link is present in the RDA59 # source data.)60 # (e.g. kbrda:ComputerChipCartridge rdfs:subClassOf kbrda:Computer .)61 if (media_id, None, None) in g2:62 for narrower in carrier_g.subjects(SKOS.broader, top):63 for specific_carrier in g2.subjects(OWL.sameAs, narrower):64 g2.add((specific_carrier, RDFS.subClassOf, media_id))65 # HACK instead:66 hack_media_id_by_chopped_top_carrier[unicode(top)[:-1]] = media_id67 # HACK to get the carrier media subclass relations...68 # 1. Match toplevel(Deprecated) and find carrier URIs starting with their iri[:-1]69 for carrier in g2.subjects(RDF.type, KBV.CarrierType):70 if carrier == KBRDA.MicroscopeSlide:71 continue72 for sameas in g2.objects(carrier, OWL.sameAs):73 media_id = hack_media_id_by_chopped_top_carrier.get(unicode(sameas[:-1]))74 if media_id:75 g2.add((carrier, RDFS.subClassOf, media_id))76 break77 # 2. Complement by a crude string matching78 carriers = set(g2.subjects(RDF.type, KBV.CarrierType))79 medias = set(unicode(media) for media in g2.subjects(RDF.type, KBV.MediaType))80 for media in medias:81 for carrier in carriers:82 if unicode(carrier).startswith(media):83 g2.add((carrier, RDFS.subClassOf, URIRef(media)))84 # 3. Add remaining manually85 g2.add((KBRDA.MicroscopeSlide, RDFS.subClassOf, KBRDA.Microscopic))86 g2.add((KBRDA.FilmRoll, RDFS.subClassOf, KBRDA.Projected))87 _make_mappings(g2, RDAIssuanceType, LCIssuanceType, KBV.IssuanceType)88 return g289def _make_mappings(g2, rda_type_scheme, lc_type_scheme, rtype):90 data_url = str(rda_type_scheme) + '.ttl'91 g = Graph().parse(data_url, format='turtle')92 g.parse(lc_type_scheme)93 for s in g.subjects(SKOS.inScheme, rda_type_scheme):94 for p, preflabel_en in g.preferredLabel(s, 'en'):95 symbol = preflabel_en.title().replace(' ', '').replace('-', '')96 uri = KBRDA[symbol]97 g2.add((uri, RDF.type, OWL.Class))98 g2.add((uri, RDF.type, rtype))99 # TODO: either type or subClassOf...100 # g2.add((uri, RDFS.subClassOf, rtype))101 g2.add((uri, OWL.sameAs, s))102 for p2 in (SKOS.definition, SKOS.prefLabel, SKOS.scopeNote):103 for l in g.objects(s, p2):...
tasks.py
Source:tasks.py
...42 existing_mappings = DeviceMapping.objects.filter(43 from_device__in=from_devices, to_device=to_device)44 for mapping in existing_mappings:45 from_devices.discard(mapping.from_device)46def _make_mappings(device_instance, serve_to=None, client_to=None):47 if not serve_to and not client_to:48 return 49 def _make_single_mapping(s, from_device, to_device):50 msg = "map %d %d" % (from_device.id, to_device.id)51 logger.info(msg)52 s.send(msg)53 s = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)54 s.connect(IPC_COMMAND_PATH)55 if serve_to:56 for other_device in serve_to:57 _make_single_mapping(s, device_instance, other_device)58 if client_to:59 for other_device in client_to:60 _make_single_mapping(s, other_device, device_instance)61 62 s.shutdown(socket.SHUT_RDWR)63 s.close()64def _get_discoverers(device_instance):65 disc_by, disc_by_all = _expand_principal_list(66 device_instance.device.discoverable_by.all())67 def _discovery_allowed(other_instance):68 return disc_by_all or other_instance.device in disc_by69 services = ServiceInstance.objects.filter(device_instance=device_instance70 ).values("service")71 potential_clients = ConnectedDevice.objects.filter(72 interested_services__in=services)73 potential_clients = set(x for x in potential_clients if _discovery_allowed(x))74 _filter_not_mapped_already_to(potential_clients, device_instance)75 return potential_clients76@task(name="connect_device")77def connect_device_evt(device_instance_id):78 """Create mappings when a device first connects."""79 device_instance = ConnectedDevice.objects.get(id=device_instance_id)80 serve_to, _ = _expand_principal_list(device_instance.device.serve_to.all())81 serve_to = _get_device_instances(serve_to)82 client_to, _ = _expand_principal_list(device_instance.device.client_to.all())83 client_to = _get_device_instances(client_to)84 _make_mappings(device_instance, serve_to=serve_to, client_to=client_to)85@task(name="update_device")86def update_device_evt(device_instance_id):87 """Create mappings when a device has been updated."""88 device_instance = ConnectedDevice.objects.get(id=device_instance_id)89 serve_to, _= _expand_principal_list(device_instance.device.serve_to.all())90 serve_to = _get_device_instances(serve_to)91 serve_to |= _get_discoverers(device_instance)92 _filter_not_mapped_already_from(device_instance, serve_to)93 client_to, _ = _expand_principal_list(device_instance.device.client_to.all())94 client_to = _get_device_instances(client_to)95 _filter_not_mapped_already_to(client_to, device_instance)96 _make_mappings(device_instance, serve_to=serve_to, client_to=client_to)97@task(name="register_device_interest")98def register_interest_service_evt(device_instance_id, service_uuid):99 """Find devices in the network with the service."""100 device_instance = ConnectedDevice.objects.get(id=device_instance_id)101 def _discovery_allowed(other_instance):102 for principal in other_instance.device.discoverable_by.all():103 if principal.name == "*":104 return True105 elif principal.name == device_instance.device.name: 106 return True107 elif isinstance(principal, PrincipalGroup):108 for member in principal.members.all():109 if member.name == device_instance.device.name:110 return True111 return False112 potential_servers = set(x.device_instance for x in \113 ServiceInstance.objects.filter(service__uuid=service_uuid))114 potential_servers = set(x for x in potential_servers if _discovery_allowed(x))115 _filter_not_mapped_already_to(potential_servers, device_instance)116 _make_mappings(device_instance, client_to=potential_servers)117@periodic_task(118 run_every=timedelta(seconds=60), 119 name="timeout_gateways", 120 ignore_result=True121)122def timeout_gateways():123 """Remove lingering gateway sessions"""124 logger.info("Timing out gateway instances.")125 threshold = timezone.now() - timedelta(seconds=GATEWAY_CONNECTION_TIMEOUT)126 ConnectedGateway.objects.filter(is_connected=False, 127 last_updated__lt=threshold).delete()128@periodic_task(129 run_every=timedelta(seconds=60), 130 name="timeout_access_control_state", ...
mapper.py
Source:mapper.py
...100 self.instDivs = []101 self.identityMapping = False102 self.nullMapping = False103 self.parse (input)104 self.mappings = self._make_mappings()105 106 def _make_mappings (self):107 mappings = []108 if not self.nullMapping:109 for val in self.values:110 mappings.append (Mapping (val, self.instNames, self.instDivs))111 return mappings112 113 114 def parse (self, input):115 pat = "map to:"116 splits = input.split (pat)117 self._parseValues (splits[0])118 if len (splits) == 1:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!