Best Python code snippet using slash
xcap_manager.py
Source:xcap_manager.py
1#!/usr/bin/python2"""3Script for testing XCAPManager. Initializes middleware using a configuration4file named "test-config" in the current directory. Uses the default account to5initialize an XCAPManager. Type "help" for a list of implemented commands. The6commands have the same arguments as the ones in XCAPManager. Arguments are7specified using JSON (don't use spaces or quote each argument):8add_contact 'contact={"name": "My buddy", "uri": "sip:buddy@example.org", "group": "Buddies"}'9or10add_contact contact={"name":"My buddy","uri":"sip:buddy@example.org","group":"Buddies"}11To exit, type "exit".12"""13import cjson14import os15import shlex16import string17from cmd import Cmd18from itertools import count, takewhile19from StringIO import StringIO20from application.notification import NotificationCenter, IObserver21from application.python.decorator import decorator, preserve_signature22from application.python.util import Null, Singleton23from threading import Event24from zope.interface import implements25from sipsimple.account import AccountManager26from sipsimple.application import SIPApplication27from sipsimple.configuration.backend.file import FileBackend28from sipsimple.util import Timestamp, run_in_green_thread29from sipsimple.xcap import All, CatchAllCondition, Class, Contact, DeviceID, DialoginfoPolicy, DomainCondition, DomainException, OccurenceID, OfflineStatus, PresencePolicy, ServiceURI, ServiceURIScheme, UserException, XCAPManager30class XCAPApplication(object):31 __metaclass__ = Singleton32 implements(IObserver)33 def __init__(self):34 self.application = SIPApplication()35 self.xcap_manager = None36 self.quit_event = Event()37 notification_center = NotificationCenter()38 notification_center.add_observer(self, sender=self.application)39 def start(self):40 self.application.start(FileBackend(os.path.realpath('test-config')))41 @run_in_green_thread42 def stop(self):43 self.xcap_manager.stop()44 self.application.stop()45 def handle_notification(self, notification):46 handler = getattr(self, '_NH_%s' % notification.name, Null)47 handler(notification)48 @run_in_green_thread49 def _NH_SIPApplicationDidStart(self, notification):50 account_manager = AccountManager()51 self.xcap_manager = XCAPManager(account_manager.default_account)52 notification_center = NotificationCenter()53 notification_center.add_observer(self, sender=self.xcap_manager)54 self.xcap_manager.load(os.path.realpath('xcap-cache'))55 self.xcap_manager.start()56 def _NH_SIPApplicationDidEnd(self, notification):57 self.quit_event.set()58 def _NH_XCAPManagerDidChangeState(self, notification):59 print 'XCAP Manager state %s -> %s' % (notification.data.prev_state, notification.data.state)60 def _NH_XCAPManagerWillStart(self, notification):61 print 'XCAP Manager will start'62 def _NH_XCAPManagerDidStart(self, notification):63 print 'XCAP Manager did start'64 def _NH_XCAPManagerDidDiscoverServerCapabilities(self, notification):65 print ' contact list supported: %s' % notification.data.contactlist_supported66 print ' presence policies supported: %s' % notification.data.presence_policies_supported67 print ' dialoginfo policies supported: %s' % notification.data.dialoginfo_policies_supported68 print ' status icon supported: %s' % notification.data.status_icon_supported69 print ' offline status supported: %s' % notification.data.offline_status_supported70 def _NH_XCAPManagerWillEnd(self, notification):71 print 'XCAP Manager will end'72 def _NH_XCAPManagerDidEnd(self, notification):73 print 'XCAP Manager did end'74 def _NH_XCAPManagerDidReloadData(self, notification):75 print 'XCAP Manager reloaded data:'76 groups = dict.fromkeys(notification.data.groups)77 for group in groups:78 groups[group] = []79 for contact in notification.data.contacts:80 if contact.group is not None:81 groups[contact.group].append(contact)82 print 'Buddies:'83 for group, contacts in groups.iteritems():84 print ' %s:' % group85 for contact in contacts:86 if contact.name:87 print ' %s <%s>' % (contact.name, contact.uri)88 else:89 print ' %s' % contact.uri90 print ' subscribe-to-presence = %s' % contact.subscribe_to_presence91 print ' subscribe-to-dialoginfo = %s' % contact.subscribe_to_dialoginfo92 print ' presence-policies = %s' % (', '.join(p.id for p in contact.presence_policies) if contact.presence_policies else None)93 print ' dialoginfo-policies = %s' % (', '.join(p.id for p in contact.dialoginfo_policies) if contact.dialoginfo_policies else None)94 for attr, value in contact.attributes.iteritems():95 print ' x: %s = %s' % (attr, value)96 print97 print 'Presence policies:'98 for policy in notification.data.presence_policies:99 print ' %s -> %s' % (policy.id, policy.action)100 if policy.sphere:101 print ' sphere = %s' % policy.sphere102 if policy.validity:103 print ' valid between:'104 for from_timestamp, until_timestamp in policy.validity:105 print ' %s - %s' % (from_timestamp, until_timestamp)106 if policy.multi_identity_conditions:107 print ' multi identity conditions:'108 for multi_condition in policy.multi_identity_conditions:109 if isinstance(multi_condition, CatchAllCondition) and multi_condition.exceptions:110 print ' anyone except'111 for exception in multi_condition.exceptions:112 if isinstance(exception, DomainException):113 print ' users from domain %s' % exception.domain114 elif isinstance(exception, UserException):115 print ' user %s' % exception.uri116 elif isinstance(multi_condition, CatchAllCondition):117 print ' anyone'118 elif isinstance(multi_condition, DomainCondition) and multi_condition.exceptions:119 print ' anyone from domain %s except' % multi_condition.domain120 for exception in multi_condition.exceptions:121 if isinstance(exception, UserException):122 print ' user %s' % exception.uri123 elif isinstance(multi_condition, DomainCondition):124 print ' anyone from domain %s' % multi_condition.domain125 if policy.provide_devices is All:126 print ' provide-devices = All'127 elif policy.provide_devices:128 print ' provide-devices:'129 for prv in policy.provide_devices:130 if isinstance(prv, Class):131 print ' class = %s' % prv132 elif isinstance(prv, OccurenceID):133 print ' occurence-id = %s' % prv134 elif isinstance(prv, DeviceID):135 print ' device-id = %s' % prv136 else:137 print ' unknown = %s(%r)' % (prv, type(prv).__name__)138 if policy.provide_persons is All:139 print ' provide-persons = All'140 elif policy.provide_persons:141 print ' provide-persons:'142 for prv in policy.provide_persons:143 if isinstance(prv, Class):144 print ' class = %s' % prv145 elif isinstance(prv, OccurenceID):146 print ' occurence-id = %s' % prv147 else:148 print ' unknown = %s(%r)' % (prv, type(prv).__name__)149 if policy.provide_services is All:150 print ' provide-services = All'151 elif policy.provide_services:152 print ' provide-services:'153 for prv in policy.provide_services:154 if isinstance(prv, Class):155 print ' class = %s' % prv156 elif isinstance(prv, OccurenceID):157 print ' occurence-id = %s' % prv158 elif isinstance(prv, ServiceURI):159 print ' service-uri = %s' % prv160 elif isinstance(prv, ServiceURIScheme):161 print ' service-uri-scheme = %s' % prv162 else:163 print ' unknown = %s(%r)' % (prv, type(prv).__name__)164 print ' provide-activities = %s' % policy.provide_activities165 print ' provide-class = %s' % policy.provide_class166 print ' provide-device-id = %s' % policy.provide_device_id167 print ' provide-mood = %s' % policy.provide_mood168 print ' provide-place-is = %s' % policy.provide_place_is169 print ' provide-place-type = %s' % policy.provide_place_type170 print ' provide-privacy = %s' % policy.provide_privacy171 print ' provide-relationship = %s' % policy.provide_relationship172 print ' provide-status-icon = %s' % policy.provide_status_icon173 print ' provide-sphere = %s' % policy.provide_sphere174 print ' provide-time-offset = %s' % policy.provide_time_offset175 print ' provide-user-input = %s' % policy.provide_user_input176 print ' provide-unknown-attributes = %s' % policy.provide_unknown_attributes177 print ' provide-all-attributes = %s' % policy.provide_all_attributes178 print179 print 'Dialog-info policies:'180 for policy in notification.data.dialoginfo_policies:181 print ' %s -> %s' % (policy.id, policy.action)182 if policy.sphere:183 print ' sphere = %s' % policy.sphere184 if policy.validity:185 print ' valid between:'186 for from_timestamp, until_timestamp in policy.validity:187 print ' %s - %s' % (from_timestamp, until_timestamp)188 print189 print 'RLS services:'190 for service in notification.data.services:191 print ' %s -> %s' % (service.uri, ', '.join(service.packages))192 for entry in service.entries:193 print ' %s' % entry194 print195 print 'Offline status:'196 if notification.data.offline_status:197 print ' Note: %s' % notification.data.offline_status.note198 print ' Activity: %s' % notification.data.offline_status.activity199 else:200 print ' Missing'201application = XCAPApplication()202application.start()203class Stop(object):204 pass205@decorator206def command_handler(func):207 @preserve_signature(func)208 def wrapper(*args, **kw):209 try:210 return func(*args, **kw)211 except Exception, e:212 print 'Error: %s' % e213 return wrapper214class Interface(Cmd):215 prompt = 'xcap> '216 def emptyline(self):217 return218 def parse_arguments(self, line):219 s = shlex.shlex(StringIO(line))220 s.wordchars = ''.join(c for c in string.printable if c!="'" and c not in string.whitespace)221 s.quotes = "'"222 args = list(takewhile(lambda x: x, (s.get_token() for _ in count())))223 arguments = {}224 for arg in args:225 if len(arg) >= 2 and arg[0] == arg[-1] and arg[0] in ('"', "'"):226 arg = arg[1:-1]227 try:228 name, value = arg.split('=', 1)229 arguments[name] = cjson.decode(value)230 except (ValueError, cjson.DecodeError):231 continue232 return arguments233 @command_handler234 def do_start_transaction(self, line):235 application.xcap_manager.start_transaction()236 @command_handler237 def do_commit_transaction(self, line):238 application.xcap_manager.commit_transaction()239 @command_handler240 def do_add_group(self, line):241 arguments = self.parse_arguments(line)242 group = arguments.get('group')243 if not isinstance(group, basestring):244 raise ValueError('expected string group')245 application.xcap_manager.add_group(group)246 @command_handler247 def do_rename_group(self, line):248 arguments = self.parse_arguments(line)249 old_name = arguments.get('old_name')250 new_name = arguments.get('new_name')251 if not isinstance(old_name, basestring):252 raise ValueError('expected string old_name')253 if not isinstance(new_name, basestring):254 raise ValueError('expected string new_name')255 application.xcap_manager.rename_group(old_name, new_name)256 @command_handler257 def do_remove_group(self, line):258 arguments = self.parse_arguments(line)259 group = arguments.get('group')260 if not isinstance(group, basestring):261 raise ValueError('expected string group')262 application.xcap_manager.remove_group(group)263 @command_handler264 def do_add_contact(self, line):265 arguments = self.parse_arguments(line)266 contact_attrs = arguments.get('contact')267 if not isinstance(contact_attrs, dict):268 raise ValueError('expected object contact')269 contact = self.get_contact(contact_attrs)270 application.xcap_manager.add_contact(contact)271 @command_handler272 def do_update_contact(self, line):273 arguments = self.parse_arguments(line)274 contact_attrs = arguments.pop('contact', None)275 if not isinstance(contact_attrs, dict):276 raise ValueError('expected object contact')277 contact = self.get_contact(contact_attrs)278 if 'presence_policies' in arguments:279 presence_policies = arguments.pop('presence_policies')280 if presence_policies is not None and not isinstance(presence_policies, list):281 raise ValueError('expected list attribute presence_policies')282 if any(not isinstance(policy, dict) for policy in presence_policies):283 raise ValueError('expected object items in presence_policies')284 arguments['presence_policies'] = [self.get_presence_policy(policy) for policy in presence_policies]285 if 'dialoginfo_policies' in arguments:286 dialoginfo_policies = arguments.pop('dialoginfo_policies')287 if dialoginfo_policies is not None and not isinstance(dialoginfo_policies, list):288 raise ValueError('expected list attribute dialoginfo_policies')289 if any(not isinstance(policy, dict) for policy in dialoginfo_policies):290 raise ValueError('expected object items in dialoginfo_policies')291 arguments['dialoginfo_policies'] = [self.get_dialoginfo_policy(policy) for policy in dialoginfo_policies]292 application.xcap_manager.update_contact(contact, **arguments)293 @command_handler294 def do_remove_contact(self, line):295 arguments = self.parse_arguments(line)296 contact_attrs = arguments.pop('contact', None)297 if not isinstance(contact_attrs, dict):298 raise ValueError('expected object contact')299 contact = self.get_contact(contact_attrs)300 application.xcap_manager.remove_contact(contact)301 @command_handler302 def do_add_presence_policy(self, line):303 arguments = self.parse_arguments(line)304 policy = arguments.pop('policy', None)305 if not isinstance(policy, dict):306 raise ValueError('expected object policy')307 application.xcap_manager.add_presence_policy(self.get_presence_policy(policy))308 @command_handler309 def do_update_presence_policy(self, line):310 arguments = self.parse_arguments(line)311 policy = arguments.pop('policy', None)312 if not isinstance(policy, dict):313 raise ValueError('expected object policy')314 policy = self.get_presence_policy(policy)315 if 'validity' in arguments:316 validity = arguments.pop('validity')317 if validity is not None:318 if not isinstance(validity, list):319 raise ValueError('expected list validity or nill')320 validity = [(Timestamp(from_timestamp), Timestamp(until_timestamp)) for from_timestamp, until_timestamp in validity]321 arguments['validity'] = validity322 multi_identity_conditions = arguments.pop('multi_identity_conditions', None)323 if multi_identity_conditions is not None and not isinstance(multi_identity_conditions, list):324 raise ValueError('expected list multi_identity_conditions or nill')325 if multi_identity_conditions is not None:326 arguments['multi_identity_conditions'] = []327 for multi_condition_attributes in multi_identity_conditions:328 if 'domain' in multi_condition_attributes:329 multi_condition = DomainCondition(multi_condition_attributes.pop('domain'))330 else:331 multi_condition = CatchAllCondition()332 arguments['multi_identity_conditions'].append(multi_condition)333 if multi_condition_attributes.get('exceptions', None):334 for exception_attributes in multi_condition_attributes['exceptions']:335 if 'domain' in exception_attributes:336 multi_condition.exceptions.append(DomainException(exception_attributes.pop('domain')))337 elif 'uri' in exception_attributes:338 multi_condition.exceptions.append(UserException(exception_attributes.pop('uri')))339 application.xcap_manager.update_presence_policy(policy, **arguments)340 @command_handler341 def do_remove_presence_policy(self, line):342 arguments = self.parse_arguments(line)343 policy = arguments.pop('policy', None)344 if not isinstance(policy, dict):345 raise ValueError('expected object policy')346 policy = self.get_presence_policy(policy)347 application.xcap_manager.remove_presence_policy(policy)348 @command_handler349 def do_add_dialoginfo_policy(self, line):350 arguments = self.parse_arguments(line)351 policy = arguments.pop('policy', None)352 if not isinstance(policy, dict):353 raise ValueError('expected object policy')354 application.xcap_manager.add_dialoginfo_policy(self.get_dialoginfo_policy(policy))355 @command_handler356 def do_update_dialoginfo_policy(self, line):357 arguments = self.parse_arguments(line)358 policy = arguments.pop('policy', None)359 if not isinstance(policy, dict):360 raise ValueError('expected object policy')361 policy = self.get_dialoginfo_policy(policy)362 if 'validity' in arguments:363 validity = arguments.pop('validity')364 if validity is not None:365 if not isinstance(validity, list):366 raise ValueError('expected list validity or nill')367 validity = [(Timestamp(from_timestamp), Timestamp(until_timestamp)) for from_timestamp, until_timestamp in validity]368 arguments['validity'] = validity369 multi_identity_conditions = arguments.pop('multi_identity_conditions', None)370 if multi_identity_conditions is not None and not isinstance(multi_identity_conditions, list):371 raise ValueError('expected list multi_identity_conditions or nill')372 if multi_identity_conditions is not None:373 arguments['multi_identity_conditions'] = []374 for multi_condition_attributes in multi_identity_conditions:375 if 'domain' in multi_condition_attributes:376 multi_condition = DomainCondition(multi_condition_attributes.pop('domain'))377 else:378 multi_condition = CatchAllCondition()379 arguments['multi_identity_conditions'].append(multi_condition)380 if multi_condition_attributes.get('exceptions', None):381 for exception_attributes in multi_condition_attributes['exceptions']:382 if 'domain' in exception_attributes:383 multi_condition.exceptions.append(DomainException(exception_attributes.pop('domain')))384 elif 'uri' in exception_attributes:385 multi_condition.exceptions.append(UserException(exception_attributes.pop('uri')))386 application.xcap_manager.update_dialoginfo_policy(policy, **arguments)387 @command_handler388 def do_remove_dialoginfo_policy(self, line):389 arguments = self.parse_arguments(line)390 policy = arguments.pop('policy', None)391 if not isinstance(policy, dict):392 raise ValueError('expected object policy')393 policy = self.get_dialoginfo_policy(policy)394 application.xcap_manager.remove_dialoginfo_policy(policy)395 @command_handler396 def do_set_offline_status(self, line):397 arguments = self.parse_arguments(line)398 if not arguments:399 application.xcap_manager.set_offline_status(None)400 else:401 note = arguments.pop('note', None)402 activity = arguments.pop('activity', None)403 application.xcap_manager.set_offline_status(OfflineStatus(activity, note))404 @command_handler405 def do_exit(self, line):406 application.stop()407 application.quit_event.wait()408 return Stop409 def get_contact(self, obj):410 if 'uri' not in obj:411 raise ValueError('expected string attribute uri of contact object')412 contact = Contact(obj.pop('name', None), obj.pop('uri'), obj.pop('group', None))413 presence_policies = obj.pop('presence_policies', None)414 if presence_policies is not None:415 if not isinstance(presence_policies, list):416 raise ValueError('expected list attribute presence_policies of contact object or nill')417 if any(not isinstance(policy, dict) for policy in presence_policies):418 raise ValueError('expected object items in attribute presence_policies of contact object')419 contact.presence_policies = [self.get_presence_policy(policy) for policy in presence_policies]420 dialoginfo_policies = obj.pop('dialoginfo_policies', None)421 if dialoginfo_policies is not None:422 if not isinstance(dialoginfo_policies, list):423 raise ValueError('expected list attribute dialoginfo_policies of contact object or nill')424 if any(not isinstance(policy, dict) for policy in dialoginfo_policies):425 raise ValueError('expected object items in attribute dialoginfo_policies of contact object')426 contact.dialoginfo_policies = [self.get_dialoginfo_policy(policy) for policy in dialoginfo_policies]427 for attr, value in obj.iteritems():428 if attr in ('subscribe_to_presence', 'subscribe_to_dialoginfo'):429 value = True if value == 'True' else False430 setattr(contact, attr, value)431 return contact432 def get_policy(self, cls, obj):433 policy = cls(obj.pop('id', None), obj.pop('action', None))434 policy.name = obj.pop('name', None)435 validity = obj.pop('validity', None)436 if validity is not None:437 if not isinstance(validity, list):438 raise ValueError('expected list attribute validity of policy object or nill')439 policy.validity = [(Timestamp(from_timestamp), Timestamp(until_timestamp)) for from_timestamp, until_timestamp in validity]440 sphere = obj.pop('sphere', None)441 if sphere is not None and not isinstance(sphere, basestring):442 raise ValueError('expected string attribute sphere of policy object or nill')443 policy.sphere = sphere444 multi_identity_conditions = obj.pop('multi_identity_conditions', None)445 if multi_identity_conditions is not None and not isinstance(multi_identity_conditions, list):446 raise ValueError('expected list attribute multi_identity_conditions of policy object or nill')447 if multi_identity_conditions is not None:448 policy.multi_identity_conditions = []449 for multi_condition_attributes in multi_identity_conditions:450 if 'domain' in multi_condition_attributes:451 multi_condition = DomainCondition(multi_condition_attributes.pop('domain'))452 else:453 multi_condition = CatchAllCondition()454 policy.multi_identity_conditions.append(multi_condition)455 if multi_condition_attributes.get('exceptions', None):456 for exception_attributes in multi_condition_attributes['exceptions']:457 if 'domain' in exception_attributes:458 print 'adding exception for domain'459 multi_condition.exceptions.append(DomainException(exception_attributes.pop('domain')))460 elif 'uri' in exception_attributes:461 print 'adding exception for uri'462 multi_condition.exceptions.append(UserException(exception_attributes.pop('uri')))463 return policy464 def get_presence_policy(self, obj):465 policy = self.get_policy(PresencePolicy, obj)466 return policy467 def get_dialoginfo_policy(self, obj):468 policy = self.get_policy(DialoginfoPolicy, obj)469 return policy470 def postcmd(self, stop, line):471 if stop is Stop:472 return True473interface = Interface()...
encoders.py
Source:encoders.py
1#-------------------------------------------------------------------------------2#3# Project: EOxServer <http://eoxserver.org>4# Authors: Fabian Schindler <fabian.schindler@eox.at>5#6#-------------------------------------------------------------------------------7# Copyright (C) 2013 EOX IT Services GmbH8#9# Permission is hereby granted, free of charge, to any person obtaining a copy10# of this software and associated documentation files (the "Software"), to deal11# in the Software without restriction, including without limitation the rights12# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell13# copies of the Software, and to permit persons to whom the Software is14# furnished to do so, subject to the following conditions:15#16# The above copyright notice and this permission notice shall be included in all17# copies of this Software or works derived from this Software.18#19# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR20# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,21# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE22# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER23# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,24# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN25# THE SOFTWARE.26#-------------------------------------------------------------------------------27from lxml.builder import ElementMaker28from eoxserver.core.util.xmltools import XMLEncoder, NameSpace, NameSpaceMap29ns_xlink = NameSpace("http://www.w3.org/1999/xlink", "xlink")30ns_ows = NameSpace("http://www.opengis.net/ows/1.1", "ows", "http://schemas.opengis.net/ows/1.1.0/owsExceptionReport.xsd")31ns_xml = NameSpace("http://www.w3.org/XML/1998/namespace", "xml")32nsmap = NameSpaceMap(ns_ows)33OWS = ElementMaker(namespace=ns_ows.uri, nsmap=nsmap)34class OWS11ExceptionXMLEncoder(XMLEncoder):35 def encode_exception(self, message, version, code, locator=None):36 exception_attributes = {37 "exceptionCode": code38 }39 if locator:40 exception_attributes["locator"] = locator41 exception_text = (OWS("ExceptionText", message),) if message else ()42 return OWS("ExceptionReport", 43 OWS("Exception", *exception_text, **exception_attributes44 ), version=version, **{ns_xml("lang"): "en"}45 )46 def get_schema_locations(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!