Best JavaScript code snippet using wpt
testhelpers.py
Source:testhelpers.py
...52 self.assertEqual(expected, mock.mock_calls)53 self.assertEqual(mock.mock_calls, expected)54class CallTest(unittest.TestCase):55 def test_call_with_call(self):56 kall = _Call()57 self.assertEqual(kall, _Call())58 self.assertEqual(kall, _Call(('',)))59 self.assertEqual(kall, _Call(((),)))60 self.assertEqual(kall, _Call(({},)))61 self.assertEqual(kall, _Call(('', ())))62 self.assertEqual(kall, _Call(('', {})))63 self.assertEqual(kall, _Call(('', (), {})))64 self.assertEqual(kall, _Call(('foo',)))65 self.assertEqual(kall, _Call(('bar', ())))66 self.assertEqual(kall, _Call(('baz', {})))67 self.assertEqual(kall, _Call(('spam', (), {})))68 kall = _Call(((1, 2, 3),))69 self.assertEqual(kall, _Call(((1, 2, 3),)))70 self.assertEqual(kall, _Call(('', (1, 2, 3))))71 self.assertEqual(kall, _Call(((1, 2, 3), {})))72 self.assertEqual(kall, _Call(('', (1, 2, 3), {})))73 kall = _Call(((1, 2, 4),))74 self.assertNotEqual(kall, _Call(('', (1, 2, 3))))75 self.assertNotEqual(kall, _Call(('', (1, 2, 3), {})))76 kall = _Call(('foo', (1, 2, 4),))77 self.assertNotEqual(kall, _Call(('', (1, 2, 4))))78 self.assertNotEqual(kall, _Call(('', (1, 2, 4), {})))79 self.assertNotEqual(kall, _Call(('bar', (1, 2, 4))))80 self.assertNotEqual(kall, _Call(('bar', (1, 2, 4), {})))81 kall = _Call(({'a': 3},))82 self.assertEqual(kall, _Call(('', (), {'a': 3})))83 self.assertEqual(kall, _Call(('', {'a': 3})))84 self.assertEqual(kall, _Call(((), {'a': 3})))85 self.assertEqual(kall, _Call(({'a': 3},)))86 def test_empty__Call(self):87 args = _Call()88 self.assertEqual(args, ())89 self.assertEqual(args, ('foo',))90 self.assertEqual(args, ((),))91 self.assertEqual(args, ('foo', ()))92 self.assertEqual(args, ('foo',(), {}))93 self.assertEqual(args, ('foo', {}))94 self.assertEqual(args, ({},))95 def test_named_empty_call(self):96 args = _Call(('foo', (), {}))97 self.assertEqual(args, ('foo',))98 self.assertEqual(args, ('foo', ()))99 self.assertEqual(args, ('foo',(), {}))100 self.assertEqual(args, ('foo', {}))101 self.assertNotEqual(args, ((),))102 self.assertNotEqual(args, ())103 self.assertNotEqual(args, ({},))104 self.assertNotEqual(args, ('bar',))105 self.assertNotEqual(args, ('bar', ()))106 self.assertNotEqual(args, ('bar', {}))107 def test_call_with_args(self):108 args = _Call(((1, 2, 3), {}))109 self.assertEqual(args, ((1, 2, 3),))110 self.assertEqual(args, ('foo', (1, 2, 3)))111 self.assertEqual(args, ('foo', (1, 2, 3), {}))112 self.assertEqual(args, ((1, 2, 3), {}))113 def test_named_call_with_args(self):114 args = _Call(('foo', (1, 2, 3), {}))115 self.assertEqual(args, ('foo', (1, 2, 3)))116 self.assertEqual(args, ('foo', (1, 2, 3), {}))117 self.assertNotEqual(args, ((1, 2, 3),))118 self.assertNotEqual(args, ((1, 2, 3), {}))119 def test_call_with_kwargs(self):120 args = _Call(((), dict(a=3, b=4)))121 self.assertEqual(args, (dict(a=3, b=4),))122 self.assertEqual(args, ('foo', dict(a=3, b=4)))123 self.assertEqual(args, ('foo', (), dict(a=3, b=4)))124 self.assertEqual(args, ((), dict(a=3, b=4)))125 def test_named_call_with_kwargs(self):126 args = _Call(('foo', (), dict(a=3, b=4)))127 self.assertEqual(args, ('foo', dict(a=3, b=4)))128 self.assertEqual(args, ('foo', (), dict(a=3, b=4)))129 self.assertNotEqual(args, (dict(a=3, b=4),))130 self.assertNotEqual(args, ((), dict(a=3, b=4)))131 def test_call_with_args_call_empty_name(self):132 args = _Call(((1, 2, 3), {}))133 self.assertEqual(args, call(1, 2, 3))134 self.assertEqual(call(1, 2, 3), args)135 self.assertIn(call(1, 2, 3), [args])136 def test_call_ne(self):137 self.assertNotEqual(_Call(((1, 2, 3),)), call(1, 2))138 self.assertFalse(_Call(((1, 2, 3),)) != call(1, 2, 3))139 self.assertTrue(_Call(((1, 2), {})) != call(1, 2, 3))140 def test_call_non_tuples(self):141 kall = _Call(((1, 2, 3),))142 for value in 1, None, self, int:143 self.assertNotEqual(kall, value)144 self.assertFalse(kall == value)145 def test_repr(self):146 self.assertEqual(repr(_Call()), 'call()')147 self.assertEqual(repr(_Call(('foo',))), 'call.foo()')148 self.assertEqual(repr(_Call(((1, 2, 3), {'a': 'b'}))),149 "call(1, 2, 3, a='b')")150 self.assertEqual(repr(_Call(('bar', (1, 2, 3), {'a': 'b'}))),151 "call.bar(1, 2, 3, a='b')")152 self.assertEqual(repr(call), 'call')153 self.assertEqual(str(call), 'call')154 self.assertEqual(repr(call()), 'call()')155 self.assertEqual(repr(call(1)), 'call(1)')156 self.assertEqual(repr(call(zz='thing')), "call(zz='thing')")157 self.assertEqual(repr(call().foo), 'call().foo')158 self.assertEqual(repr(call(1).foo.bar(a=3).bing),159 'call().foo.bar().bing')160 self.assertEqual(161 repr(call().foo(1, 2, a=3)),162 "call().foo(1, 2, a=3)"163 )164 self.assertEqual(repr(call()()), "call()()")165 self.assertEqual(repr(call(1)(2)), "call()(2)")166 self.assertEqual(167 repr(call()().bar().baz.beep(1)),168 "call()().bar().baz.beep(1)"169 )170 def test_call(self):171 self.assertEqual(call(), ('', (), {}))172 self.assertEqual(call('foo', 'bar', one=3, two=4),173 ('', ('foo', 'bar'), {'one': 3, 'two': 4}))174 mock = Mock()175 mock(1, 2, 3)176 mock(a=3, b=6)177 self.assertEqual(mock.call_args_list,178 [call(1, 2, 3), call(a=3, b=6)])179 def test_attribute_call(self):180 self.assertEqual(call.foo(1), ('foo', (1,), {}))181 self.assertEqual(call.bar.baz(fish='eggs'),182 ('bar.baz', (), {'fish': 'eggs'}))183 mock = Mock()184 mock.foo(1, 2 ,3)185 mock.bar.baz(a=3, b=6)186 self.assertEqual(mock.method_calls,187 [call.foo(1, 2, 3), call.bar.baz(a=3, b=6)])188 def test_extended_call(self):189 result = call(1).foo(2).bar(3, a=4)190 self.assertEqual(result, ('().foo().bar', (3,), dict(a=4)))191 mock = MagicMock()192 mock(1, 2, a=3, b=4)193 self.assertEqual(mock.call_args, call(1, 2, a=3, b=4))194 self.assertNotEqual(mock.call_args, call(1, 2, 3))195 self.assertEqual(mock.call_args_list, [call(1, 2, a=3, b=4)])196 self.assertEqual(mock.mock_calls, [call(1, 2, a=3, b=4)])197 mock = MagicMock()198 mock.foo(1).bar()().baz.beep(a=6)199 last_call = call.foo(1).bar()().baz.beep(a=6)200 self.assertEqual(mock.mock_calls[-1], last_call)201 self.assertEqual(mock.mock_calls, last_call.call_list())202 def test_extended_not_equal(self):203 a = call(x=1).foo204 b = call(x=2).foo205 self.assertEqual(a, a)206 self.assertEqual(b, b)207 self.assertNotEqual(a, b)208 def test_nested_calls_not_equal(self):209 a = call(x=1).foo().bar210 b = call(x=2).foo().bar211 self.assertEqual(a, a)212 self.assertEqual(b, b)213 self.assertNotEqual(a, b)214 def test_call_list(self):215 mock = MagicMock()216 mock(1)217 self.assertEqual(call(1).call_list(), mock.mock_calls)218 mock = MagicMock()219 mock(1).method(2)220 self.assertEqual(call(1).method(2).call_list(),221 mock.mock_calls)222 mock = MagicMock()223 mock(1).method(2)(3)224 self.assertEqual(call(1).method(2)(3).call_list(),225 mock.mock_calls)226 mock = MagicMock()227 int(mock(1).method(2)(3).foo.bar.baz(4)(5))228 kall = call(1).method(2)(3).foo.bar.baz(4)(5).__int__()229 self.assertEqual(kall.call_list(), mock.mock_calls)230 def test_call_any(self):231 self.assertEqual(call, ANY)232 m = MagicMock()233 int(m)234 self.assertEqual(m.mock_calls, [ANY])235 self.assertEqual([ANY], m.mock_calls)236 def test_two_args_call(self):237 args = _Call(((1, 2), {'a': 3}), two=True)238 self.assertEqual(len(args), 2)239 self.assertEqual(args[0], (1, 2))240 self.assertEqual(args[1], {'a': 3})241 other_args = _Call(((1, 2), {'a': 3}))242 self.assertEqual(args, other_args)243 def test_call_with_name(self):244 self.assertEqual(_Call((), 'foo')[0], 'foo')245 self.assertEqual(_Call((('bar', 'barz'),),)[0], '')246 self.assertEqual(_Call((('bar', 'barz'), {'hello': 'world'}),)[0], '')247class SpecSignatureTest(unittest.TestCase):248 def _check_someclass_mock(self, mock):249 self.assertRaises(AttributeError, getattr, mock, 'foo')250 mock.one(1, 2)251 mock.one.assert_called_with(1, 2)252 self.assertRaises(AssertionError,253 mock.one.assert_called_with, 3, 4)254 self.assertRaises(TypeError, mock.one, 1)255 mock.two()256 mock.two.assert_called_with()257 self.assertRaises(AssertionError,258 mock.two.assert_called_with, 3)259 self.assertRaises(TypeError, mock.two, 1)260 mock.three()...
pjsua_app.py
Source:pjsua_app.py
1# $Id: pjsua_app.py 1438 2007-09-17 15:44:47Z bennylp $2#3# Sample and simple Python script to make and receive calls, and do4# presence and instant messaging/IM using PJSUA-API binding for Python.5#6# Copyright (C) 2003-2007 Benny Prijono <benny@prijono.org>7#8import py_pjsua9import sys10import thread11#12# Configurations13#14THIS_FILE = "pjsua_app.py"15C_QUIT = 016C_LOG_LEVEL = 417# STUN config.18# Set C_STUN_HOST to the address:port of the STUN server to enable STUN19#20C_STUN_HOST = ""21#C_STUN_HOST = "192.168.0.2"22#C_STUN_HOST = "stun.iptel.org:3478"23# SIP port24C_SIP_PORT = 506025# Globals26#27g_ua_cfg = None28g_acc_id = py_pjsua.PJSUA_INVALID_ID29g_current_call = py_pjsua.PJSUA_INVALID_ID30g_wav_files = []31g_wav_id = 032g_wav_port = 033g_rec_file = ""34g_rec_id = 035g_rec_port = 036# Utility: display PJ error and exit37#38def err_exit(title, rc):39 py_pjsua.perror(THIS_FILE, title, rc)40 py_pjsua.destroy()41 exit(1)42# Logging function (also callback, called by pjsua-lib)43#44def log_cb(level, str, len):45 if level <= C_LOG_LEVEL:46 print str,47def write_log(level, str):48 log_cb(level, str + "\n", 0)49# Utility to get call info50#51def call_name(call_id):52 ci = py_pjsua.call_get_info(call_id)53 return "[Call " + `call_id` + " " + ci.remote_info + "]"54# Callback when call state has changed.55#56def on_call_state(call_id, e): 57 global g_current_call58 ci = py_pjsua.call_get_info(call_id)59 write_log(3, call_name(call_id) + " state = " + `ci.state_text`)60 if ci.state == py_pjsua.PJSIP_INV_STATE_DISCONNECTED:61 g_current_call = py_pjsua.PJSUA_INVALID_ID62# Callback for incoming call63#64def on_incoming_call(acc_id, call_id, rdata):65 global g_current_call66 67 if g_current_call != py_pjsua.PJSUA_INVALID_ID:68 # There's call in progress - answer Busy69 py_pjsua.call_answer(call_id, 486, None, None)70 return71 72 g_current_call = call_id73 ci = py_pjsua.call_get_info(call_id)74 write_log(3, "*** Incoming call: " + call_name(call_id) + "***")75 write_log(3, "*** Press a to answer or h to hangup ***")76 77 78 79# Callback when media state has changed (e.g. established or terminated)80#81def on_call_media_state(call_id):82 ci = py_pjsua.call_get_info(call_id)83 if ci.media_status == py_pjsua.PJSUA_CALL_MEDIA_ACTIVE:84 py_pjsua.conf_connect(ci.conf_slot, 0)85 py_pjsua.conf_connect(0, ci.conf_slot)86 write_log(3, call_name(call_id) + ": media is active")87 else:88 write_log(3, call_name(call_id) + ": media is inactive")89# Callback when account registration state has changed90#91def on_reg_state(acc_id):92 acc_info = py_pjsua.acc_get_info(acc_id)93 if acc_info.has_registration != 0:94 cmd = "registration"95 else:96 cmd = "unregistration"97 if acc_info.status != 0 and acc_info.status != 200:98 write_log(3, "Account " + cmd + " failed: rc=" + `acc_info.status` + " " + acc_info.status_text)99 else:100 write_log(3, "Account " + cmd + " success")101# Callback when buddy's presence state has changed102#103def on_buddy_state(buddy_id):104 write_log(3, "On Buddy state called")105 buddy_info = py_pjsua.buddy_get_info(buddy_id)106 if buddy_info.status != 0 and buddy_info.status != 200:107 write_log(3, "Status of " + `buddy_info.uri` + " is " + `buddy_info.status_text`)108 else:109 write_log(3, "Status : " + `buddy_info.status`)110# Callback on incoming pager (MESSAGE)111# 112def on_pager(call_id, strfrom, strto, contact, mime_type, text):113 write_log(3, "MESSAGE from " + `strfrom` + " : " + `text`)114# Callback on the delivery status of outgoing pager (MESSAGE)115# 116def on_pager_status(call_id, strto, body, user_data, status, reason):117 write_log(3, "MESSAGE to " + `strto` + " status " + `status` + " reason " + `reason`)118# Received typing indication119#120def on_typing(call_id, strfrom, to, contact, is_typing):121 str_t = ""122 if is_typing:123 str_t = "is typing.."124 else:125 str_t = "has stopped typing"126 write_log(3, "IM indication: " + strfrom + " " + str_t)127# Received the status of previous call transfer request128#129def on_call_transfer_status(call_id,status_code,status_text,final,p_cont):130 strfinal = ""131 if final == 1:132 strfinal = "[final]"133 134 write_log(3, "Call " + `call_id` + ": transfer status= " + `status_code` + " " + status_text+ " " + strfinal)135 136 if status_code/100 == 2:137 write_log(3, "Call " + `call_id` + " : call transfered successfully, disconnecting call")138 status = py_pjsua.call_hangup(call_id, 410, None, None)139 p_cont = 0140# Callback on incoming call transfer request141# 142def on_call_transfer_request(call_id, dst, code):143 write_log(3, "Call transfer request from " + `call_id` + " to " + dst + " with code " + `code`)144#145# Initialize pjsua.146#147def app_init():148 global g_acc_id, g_ua_cfg149 # Create pjsua before anything else150 status = py_pjsua.create()151 if status != 0:152 err_exit("pjsua create() error", status)153 # Create and initialize logging config154 log_cfg = py_pjsua.logging_config_default()155 log_cfg.level = C_LOG_LEVEL156 log_cfg.cb = log_cb157 # Create and initialize pjsua config158 # Note: for this Python module, thread_cnt must be 0 since Python159 # doesn't like to be called from alien thread (pjsua's thread160 # in this case) 161 ua_cfg = py_pjsua.config_default()162 ua_cfg.thread_cnt = 0163 ua_cfg.user_agent = "PJSUA/Python 0.1"164 ua_cfg.cb.on_incoming_call = on_incoming_call165 ua_cfg.cb.on_call_media_state = on_call_media_state166 ua_cfg.cb.on_reg_state = on_reg_state167 ua_cfg.cb.on_call_state = on_call_state168 ua_cfg.cb.on_buddy_state = on_buddy_state169 ua_cfg.cb.on_pager = on_pager170 ua_cfg.cb.on_pager_status = on_pager_status171 ua_cfg.cb.on_typing = on_typing172 ua_cfg.cb.on_call_transfer_status = on_call_transfer_status173 ua_cfg.cb.on_call_transfer_request = on_call_transfer_request174 # Configure STUN setting175 if C_STUN_HOST != "":176 ua_cfg.stun_host = C_STUN_HOST;177 # Create and initialize media config178 med_cfg = py_pjsua.media_config_default()179 med_cfg.ec_tail_len = 0180 #181 # Initialize pjsua!!182 #183 status = py_pjsua.init(ua_cfg, log_cfg, med_cfg)184 if status != 0:185 err_exit("pjsua init() error", status)186 # Configure UDP transport config187 transport_cfg = py_pjsua.transport_config_default()188 transport_cfg.port = C_SIP_PORT189 # Create UDP transport190 status, transport_id = \191 py_pjsua.transport_create(py_pjsua.PJSIP_TRANSPORT_UDP, transport_cfg)192 if status != 0:193 err_exit("Error creating UDP transport", status)194 195 # Create initial default account196 status, acc_id = py_pjsua.acc_add_local(transport_id, 1)197 if status != 0:198 err_exit("Error creating account", status)199 g_acc_id = acc_id200 g_ua_cfg = ua_cfg201# Add SIP account interractively202#203def add_account():204 global g_acc_id205 acc_domain = ""206 acc_username = ""207 acc_passwd =""208 confirm = ""209 210 # Input account configs211 print "Your SIP domain (e.g. myprovider.com): ",212 acc_domain = sys.stdin.readline()213 if acc_domain == "\n": 214 return215 acc_domain = acc_domain.replace("\n", "")216 print "Your username (e.g. alice): ",217 acc_username = sys.stdin.readline()218 if acc_username == "\n":219 return220 acc_username = acc_username.replace("\n", "")221 print "Your password (e.g. secret): ",222 acc_passwd = sys.stdin.readline()223 if acc_passwd == "\n":224 return225 acc_passwd = acc_passwd.replace("\n", "")226 # Configure account configuration227 acc_cfg = py_pjsua.acc_config_default()228 acc_cfg.id = "sip:" + acc_username + "@" + acc_domain229 acc_cfg.reg_uri = "sip:" + acc_domain230 cred_info = py_pjsua.Pjsip_Cred_Info()231 cred_info.realm = "*"232 cred_info.scheme = "digest"233 cred_info.username = acc_username234 cred_info.data_type = 0235 cred_info.data = acc_passwd236 acc_cfg.cred_info.append(1)237 acc_cfg.cred_info[0] = cred_info238 # Add new SIP account239 status, acc_id = py_pjsua.acc_add(acc_cfg, 1)240 if status != 0:241 py_pjsua.perror(THIS_FILE, "Error adding SIP account", status)242 else:243 g_acc_id = acc_id244 write_log(3, "Account " + acc_cfg.id + " added")245def add_player():246 global g_wav_files247 global g_wav_id248 global g_wav_port249 250 file_name = ""251 status = -1252 wav_id = 0253 254 print "Enter the path of the file player(e.g. /tmp/audio.wav): ",255 file_name = sys.stdin.readline()256 if file_name == "\n": 257 return258 file_name = file_name.replace("\n", "")259 status, wav_id = py_pjsua.player_create(file_name, 0)260 if status != 0:261 py_pjsua.perror(THIS_FILE, "Error adding file player ", status)262 else:263 g_wav_files.append(file_name)264 if g_wav_id == 0:265 g_wav_id = wav_id266 g_wav_port = py_pjsua.player_get_conf_port(wav_id)267 write_log(3, "File player " + file_name + " added")268 269def add_recorder():270 global g_rec_file271 global g_rec_id272 global g_rec_port273 274 file_name = ""275 status = -1276 rec_id = 0277 278 print "Enter the path of the file recorder(e.g. /tmp/audio.wav): ",279 file_name = sys.stdin.readline()280 if file_name == "\n": 281 return282 file_name = file_name.replace("\n", "")283 status, rec_id = py_pjsua.recorder_create(file_name, 0, None, 0, 0)284 if status != 0:285 py_pjsua.perror(THIS_FILE, "Error adding file recorder ", status)286 else:287 g_rec_file = file_name288 g_rec_id = rec_id289 g_rec_port = py_pjsua.recorder_get_conf_port(rec_id)290 write_log(3, "File recorder " + file_name + " added")291def conf_list():292 ports = None293 print "Conference ports : "294 ports = py_pjsua.enum_conf_ports()295 for port in ports:296 info = None297 info = py_pjsua.conf_get_port_info(port)298 txlist = ""299 for listener in info.listeners:300 txlist = txlist + "#" + `listener` + " "301 302 print "Port #" + `info.slot_id` + "[" + `(info.clock_rate/1000)` + "KHz/" + `(info.samples_per_frame * 1000 / info.clock_rate)` + "ms] " + info.name + " transmitting to: " + txlist303 304def connect_port():305 src_port = 0306 dst_port = 0307 308 print "Connect src port # (empty to cancel): "309 src_port = sys.stdin.readline()310 if src_port == "\n": 311 return312 src_port = src_port.replace("\n", "")313 src_port = int(src_port)314 print "To dst port # (empty to cancel): "315 dst_port = sys.stdin.readline()316 if dst_port == "\n": 317 return318 dst_port = dst_port.replace("\n", "")319 dst_port = int(dst_port)320 status = py_pjsua.conf_connect(src_port, dst_port)321 if status != 0:322 py_pjsua.perror(THIS_FILE, "Error connecting port ", status)323 else: 324 write_log(3, "Port connected from " + `src_port` + " to " + `dst_port`)325 326def disconnect_port():327 src_port = 0328 dst_port = 0329 330 print "Disconnect src port # (empty to cancel): "331 src_port = sys.stdin.readline()332 if src_port == "\n": 333 return334 src_port = src_port.replace("\n", "")335 src_port = int(src_port)336 print "From dst port # (empty to cancel): "337 dst_port = sys.stdin.readline()338 if dst_port == "\n": 339 return340 dst_port = dst_port.replace("\n", "")341 dst_port = int(dst_port)342 status = py_pjsua.conf_disconnect(src_port, dst_port)343 if status != 0:344 py_pjsua.perror(THIS_FILE, "Error disconnecting port ", status)345 else: 346 write_log(3, "Port disconnected " + `src_port` + " from " + `dst_port`)347def dump_call_quality():348 global g_current_call349 350 buf = ""351 if g_current_call != -1:352 buf = py_pjsua.call_dump(g_current_call, 1, 1024, " ")353 write_log(3, "\n" + buf)354 else:355 write_log(3, "No current call")356def xfer_call():357 global g_current_call358 359 if g_current_call == -1:360 361 write_log(3, "No current call")362 else:363 call = g_current_call 364 ci = py_pjsua.call_get_info(g_current_call)365 print "Transfering current call ["+ `g_current_call` + "] " + ci.remote_info366 print "Enter sip url : "367 url = sys.stdin.readline()368 if url == "\n": 369 return370 url = url.replace("\n", "")371 if call != g_current_call:372 print "Call has been disconnected"373 return374 msg_data = py_pjsua.msg_data_init()375 status = py_pjsua.call_xfer(g_current_call, url, msg_data);376 if status != 0:377 py_pjsua.perror(THIS_FILE, "Error transfering call ", status)378 else: 379 write_log(3, "Call transfered to " + url)380 381def xfer_call_replaces():382 if g_current_call == -1:383 write_log(3, "No current call")384 else:385 call = g_current_call386 387 ids = py_pjsua.enum_calls()388 if len(ids) <= 1:389 print "There are no other calls"390 return 391 ci = py_pjsua.call_get_info(g_current_call)392 print "Transfer call [" + `g_current_call` + "] " + ci.remote_info + " to one of the following:"393 for i in range(0, len(ids)): 394 if ids[i] == call:395 continue396 call_info = py_pjsua.call_get_info(ids[i])397 print `ids[i]` + " " + call_info.remote_info + " [" + call_info.state_text + "]" 398 print "Enter call number to be replaced : "399 buf = sys.stdin.readline()400 buf = buf.replace("\n","")401 if buf == "":402 return403 dst_call = int(buf)404 405 if call != g_current_call:406 print "Call has been disconnected"407 return 408 409 if dst_call == call:410 print "Destination call number must not be the same as the call being transfered"411 return412 413 if dst_call >= py_pjsua.PJSUA_MAX_CALLS:414 print "Invalid destination call number"415 return416 417 if py_pjsua.call_is_active(dst_call) == 0:418 print "Invalid destination call number"419 return 420 py_pjsua.call_xfer_replaces(call, dst_call, 0, None)421 422#423# Worker thread function.424# Python doesn't like it when it's called from an alien thread425# (pjsua's worker thread, in this case), so for Python we must426# disable worker thread in pjsua and poll pjsua from Python instead.427#428def worker_thread_main(arg):429 global C_QUIT430 thread_desc = 0;431 status = py_pjsua.thread_register("python worker", thread_desc)432 if status != 0:433 py_pjsua.perror(THIS_FILE, "Error registering thread", status)434 else:435 while C_QUIT == 0:436 py_pjsua.handle_events(50)437 print "Worker thread quitting.."438 C_QUIT = 2439# Start pjsua440#441def app_start():442 # Done with initialization, start pjsua!!443 #444 status = py_pjsua.start()445 if status != 0:446 err_exit("Error starting pjsua!", status)447 # Start worker thread448 thr = thread.start_new(worker_thread_main, (0,))449 450 print "PJSUA Started!!"451# Print account and buddy list452def print_acc_buddy_list():453 global g_acc_id454 455 acc_ids = py_pjsua.enum_accs()456 print "Account list:"457 for acc_id in acc_ids:458 acc_info = py_pjsua.acc_get_info(acc_id)459 if acc_info.has_registration == 0:460 acc_status = acc_info.status_text461 else:462 acc_status = `acc_info.status` + "/" + acc_info.status_text + " (expires=" + `acc_info.expires` + ")"463 if acc_id == g_acc_id:464 print " *",465 else:466 print " ",467 print "[" + `acc_id` + "] " + acc_info.acc_uri + ": " + acc_status468 print " Presence status: ",469 if acc_info.online_status != 0:470 print "Online"471 else:472 print "Invisible"473 if py_pjsua.get_buddy_count() > 0:474 print ""475 print "Buddy list:"476 buddy_ids = py_pjsua.enum_buddies()477 for buddy_id in buddy_ids:478 bi = py_pjsua.buddy_get_info(buddy_id)479 print " [" + `buddy_id` + "] " + bi.status_text + " " + bi.uri480 481 482# Print application menu483#484def print_menu():485 print ""486 print ">>>"487 print_acc_buddy_list()488 print """489+============================================================================+490| Call Commands : | Buddy, IM & Presence: | Account: |491| | | |492| m Make call | +b Add buddy | +a Add account |493| a Answer current call | -b Delete buddy | -a Delete accnt |494| h Hangup current call | | |495| H Hold call | i Send instant message | rr register |496| v re-inVite (release Hold) | s Subscribe presence | ru Unregister |497| # Send DTMF string | u Unsubscribe presence | |498| dq Dump curr. call quality | t ToGgle Online status | |499| +--------------------------+------------------+500| x Xfer call | Media Commands: | Status: |501| X Xfer with Replaces | | |502| | cl List ports | d Dump status |503| | cc Connect port | dd Dump detail |504| | cd Disconnect port | |505| | +p Add file player | |506|------------------------------+ +r Add file recorder | |507| q Quit application | | |508+============================================================================+"""509 print "You have " + `py_pjsua.call_get_count()` + " active call(s)"510 print ">>>", 511# Menu512#513def app_menu():514 global g_acc_id515 global g_current_call516 quit = 0517 while quit == 0:518 print_menu()519 choice = sys.stdin.readline()520 if choice[0] == "q":521 quit = 1522 elif choice[0] == "i":523 # Sending IM 524 print "Send IM to SIP URL: ",525 url = sys.stdin.readline()526 if url == "\n":527 continue528 # Send typing indication529 py_pjsua.im_typing(g_acc_id, url, 1, None) 530 print "The content: ",531 message = sys.stdin.readline()532 if message == "\n":533 py_pjsua.im_typing(g_acc_id, url, 0, None) 534 continue535 # Send the IM!536 py_pjsua.im_send(g_acc_id, url, None, message, None, 0)537 elif choice[0] == "m":538 # Make call 539 print "Using account ", g_acc_id540 print "Make call to SIP URL: ",541 url = sys.stdin.readline()542 url = url.replace("\n", "")543 if url == "":544 continue545 # Initiate the call!546 status, call_id = py_pjsua.call_make_call(g_acc_id, url, 0, 0, None)547 548 if status != 0:549 py_pjsua.perror(THIS_FILE, "Error making call", status)550 else:551 g_current_call = call_id552 elif choice[0] == "+" and choice[1] == "b":553 # Add new buddy554 bc = py_pjsua.Buddy_Config()555 print "Buddy URL: ",556 bc.uri = sys.stdin.readline()557 if bc.uri == "\n":558 continue559 560 bc.uri = bc.uri.replace("\n", "")561 bc.subscribe = 1562 status, buddy_id = py_pjsua.buddy_add(bc)563 if status != 0:564 py_pjsua.perror(THIS_FILE, "Error adding buddy", status)565 elif choice[0] == "-" and choice[1] == "b":566 print "Enter buddy ID to delete : "567 buf = sys.stdin.readline()568 buf = buf.replace("\n","")569 if buf == "":570 continue571 i = int(buf)572 if py_pjsua.buddy_is_valid(i) == 0:573 print "Invalid buddy id " + `i`574 else:575 py_pjsua.buddy_del(i)576 print "Buddy " + `i` + " deleted" 577 elif choice[0] == "+" and choice[1] == "a":578 # Add account579 add_account()580 elif choice[0] == "-" and choice[1] == "a":581 print "Enter account ID to delete : "582 buf = sys.stdin.readline()583 buf = buf.replace("\n","")584 if buf == "":585 continue586 i = int(buf)587 if py_pjsua.acc_is_valid(i) == 0:588 print "Invalid account id " + `i`589 else:590 py_pjsua.acc_del(i)591 print "Account " + `i` + " deleted"592 593 elif choice[0] == "+" and choice[1] == "p":594 add_player()595 elif choice[0] == "+" and choice[1] == "r":596 add_recorder()597 elif choice[0] == "c" and choice[1] == "l":598 conf_list()599 elif choice[0] == "c" and choice[1] == "c":600 connect_port()601 elif choice[0] == "c" and choice[1] == "d":602 disconnect_port()603 elif choice[0] == "d" and choice[1] == "q":604 dump_call_quality()605 elif choice[0] == "x":606 xfer_call()607 elif choice[0] == "X":608 xfer_call_replaces()609 elif choice[0] == "h":610 if g_current_call != py_pjsua.PJSUA_INVALID_ID:611 py_pjsua.call_hangup(g_current_call, 603, None, None)612 else:613 print "No current call"614 elif choice[0] == "H":615 if g_current_call != py_pjsua.PJSUA_INVALID_ID:616 py_pjsua.call_set_hold(g_current_call, None)617 618 else:619 print "No current call"620 elif choice[0] == "v":621 if g_current_call != py_pjsua.PJSUA_INVALID_ID:622 623 py_pjsua.call_reinvite(g_current_call, 1, None);624 else:625 print "No current call"626 elif choice[0] == "#":627 if g_current_call == py_pjsua.PJSUA_INVALID_ID: 628 print "No current call"629 elif py_pjsua.call_has_media(g_current_call) == 0:630 print "Media is not established yet!"631 else:632 call = g_current_call633 print "DTMF strings to send (0-9*#A-B)"634 buf = sys.stdin.readline()635 buf = buf.replace("\n", "")636 if buf == "":637 continue638 if call != g_current_call:639 print "Call has been disconnected"640 continue641 status = py_pjsua.call_dial_dtmf(g_current_call, buf)642 if status != 0:643 py_pjsua.perror(THIS_FILE, "Unable to send DTMF", status);644 else:645 print "DTMF digits enqueued for transmission"646 elif choice[0] == "s":647 print "Subscribe presence of (buddy id) : "648 buf = sys.stdin.readline()649 buf = buf.replace("\n","")650 if buf == "":651 continue652 i = int(buf)653 py_pjsua.buddy_subscribe_pres(i, 1)654 elif choice[0] == "u":655 print "Unsubscribe presence of (buddy id) : "656 buf = sys.stdin.readline()657 buf = buf.replace("\n","")658 if buf == "":659 continue660 i = int(buf)661 py_pjsua.buddy_subscribe_pres(i, 0)662 elif choice[0] == "t":663 acc_info = py_pjsua.acc_get_info(g_acc_id)664 if acc_info.online_status == 0:665 acc_info.online_status = 1666 else:667 acc_info.online_status = 0668 py_pjsua.acc_set_online_status(g_acc_id, acc_info.online_status)669 st = ""670 if acc_info.online_status == 0:671 st = "offline"672 else:673 st = "online"674 print "Setting " + acc_info.acc_uri + " online status to " + st675 elif choice[0] == "r":676 if choice[1] == "r": 677 py_pjsua.acc_set_registration(g_acc_id, 1)678 elif choice[1] == "u":679 py_pjsua.acc_set_registration(g_acc_id, 0)680 elif choice[0] == "d":681 py_pjsua.dump(choice[1] == "d")682 elif choice[0] == "a":683 if g_current_call != py_pjsua.PJSUA_INVALID_ID: 684 685 py_pjsua.call_answer(g_current_call, 200, None, None)686 else:687 print "No current call"688#689# main690#691app_init()692app_start()693app_menu()694#695# Done, quitting..696#697print "PJSUA shutting down.."698C_QUIT = 1699# Give the worker thread chance to quit itself700while C_QUIT != 2:701 py_pjsua.handle_events(50)702print "PJSUA destroying.."...
test_class.py
Source:test_class.py
...56# "getattr",57# "setattr",58# "delattr",59callLst = []60def trackCall(f):61 def track(*args, **kwargs):62 callLst.append((f.__name__, args))63 return f(*args, **kwargs)64 return track65class AllTests:66 trackCall = trackCall67 @trackCall68 def __coerce__(self, *args):69 return (self,) + args70 @trackCall71 def __hash__(self, *args):72 return hash(id(self))73 @trackCall74 def __str__(self, *args):...
car_telecom_utils.py
Source:car_telecom_utils.py
...38 """39 log.info("Dialing up droid {} call uri {}".format(40 ad.serial, uri))41 # First check that we are not in call.42 if ad.droid.telecomIsInCall():43 log.info("We're still in call {}".format(ad.serial))44 return False45 # Start tracking updates.46 ad.droid.telecomStartListeningForCallAdded()47 #If a phone number is passed in48 if "tel:" not in uri:49 uri = "tel:" + uri50 ad.droid.telecomCallTelUri(uri)51 event = None52 try:53 event = ad.ed.pop_event(54 tel_defines.EventTelecomCallAdded,55 tel_defines.MAX_WAIT_TIME_CONNECTION_STATE_UPDATE)56 except queue.Empty:57 log.info(58 "Did not get {} event!".format(tel_defines.EventTelecomCallAdded))59 # Return failure.60 return False61 finally:62 ad.droid.telecomStopListeningForCallAdded()63 call_id = event['data']['CallId']64 log.info("Call ID: {} dev {}".format(call_id, ad.serial))65 if not call_id:66 log.info("CallId is empty!")67 return False68 if not wait_for_dialing(log, ad):69 return False70 return call_id71def wait_for_call_state(log, ad, call_id, state):72 """Wait for the given call id to transition to the given call state.73 Args:74 log: log object75 ad: android device object76 call_id: ID of the call that we're waiting for the call state to77 transition into.78 state: desired final state.79 Returns:80 True if success, False if fail.81 """82 # Lets track the call now.83 # NOTE: Disable this everywhere we return.84 ad.droid.telecomCallStartListeningForEvent(85 call_id, tel_defines.EVENT_CALL_STATE_CHANGED)86 # We may have missed the update so do a quick check.87 if ad.droid.telecomCallGetCallState(call_id) == state:88 log.info("Call ID {} already in {} dev {}!".format(89 call_id, state, ad.serial))90 ad.droid.telecomCallStopListeningForEvent(call_id,91 tel_defines.EventTelecomCallStateChanged)92 return True93 # If not then we need to poll for the event.94 # We return if we have found a match or we timeout for further updates of95 # the call96 end_time = time.time() + 1097 while True and time.time() < end_time:98 try:99 event = ad.ed.pop_event(100 tel_defines.EventTelecomCallStateChanged,101 tel_defines.MAX_WAIT_TIME_CONNECTION_STATE_UPDATE)102 call_state = event['data']['Event']103 if call_state == state:104 ad.droid.telecomCallStopListeningForEvent(call_id,105 tel_defines.EventTelecomCallStateChanged)106 return True107 else:108 log.info("Droid {} in call {} state {}".format(109 ad.serial, call_id, call_state))110 continue111 except queue.Empty:112 log.info("Did not get into state {} dev {}".format(113 state, ad.serial))114 ad.droid.telecomCallStopListeningForEvent(call_id,115 tel_defines.EventTelecomCallStateChanged)116 return False117 return False118def hangup_call(log, ad, call_id):119 """Hangup a number120 Args:121 log: log object122 ad: android device object123 call_id: Call to hangup.124 Returns:125 True if success, False if fail.126 """127 log.info("Hanging up droid {} call {}".format(128 ad.serial, call_id))129 # First check that we are in call, otherwise fail.130 if not ad.droid.telecomIsInCall():131 log.info("We are not in-call {}".format(ad.serial))132 return False133 # Make sure we are registered with the events.134 ad.droid.telecomStartListeningForCallRemoved()135 # Disconnect call.136 ad.droid.telecomCallDisconnect(call_id)137 # Wait for removed event.138 event = None139 try:140 event = ad.ed.pop_event(141 tel_defines.EventTelecomCallRemoved,142 tel_defines.MAX_WAIT_TIME_CONNECTION_STATE_UPDATE)143 except queue.Empty:144 log.info("Did not get TelecomCallRemoved event")145 return False146 finally:147 ad.droid.telecomStopListeningForCallRemoved()148 log.info("Removed call {}".format(event))149 if event['data']['CallId'] != call_id:150 return False151 return True152def hangup_conf(log, ad, conf_id):153 """Hangup a conference call154 Args:155 log: log object156 ad: android device object157 conf_id: Conf call to hangup.158 Returns:159 True if success, False if fail.160 """161 log.info("hangup_conf: Hanging up droid {} call {}".format(162 ad.serial, conf_id))163 # First check that we are in call, otherwise fail.164 if not ad.droid.telecomIsInCall():165 log.info("We are not in-call {}".format(ad.serial))166 return False167 # Get the list of children for this conference.168 all_calls = get_call_id_children(log, ad, conf_id)169 # All calls that needs disconnecting (Parent + Children)170 all_calls.add(conf_id)171 # Make sure we are registered with the events.172 ad.droid.telecomStartListeningForCallRemoved()173 # Disconnect call.174 ad.droid.telecomCallDisconnect(conf_id)175 # Wait for removed event.176 while len(all_calls) > 0:177 event = None178 try:179 event = ad.ed.pop_event(180 tel_defines.EventTelecomCallRemoved,181 tel_defines.MAX_WAIT_TIME_CONNECTION_STATE_UPDATE)182 except queue.Empty:183 log.info("Did not get TelecomCallRemoved event")184 ad.droid.telecomStopListeningForCallRemoved()185 return False186 removed_call_id = event['data']['CallId']187 all_calls.remove(removed_call_id)188 log.info("Removed call {} left calls {}".format(removed_call_id, all_calls))189 ad.droid.telecomStopListeningForCallRemoved()190 return True191def accept_call(log, ad, call_id):192 """Accept a number193 Args:194 log: log object195 ad: android device object196 call_id: Call to accept.197 Returns:198 True if success, False if fail.199 """200 log.info("Accepting call at droid {} call {}".format(201 ad.serial, call_id))202 # First check we are in call, otherwise fail.203 if not ad.droid.telecomIsInCall():204 log.info("We are not in-call {}".format(ad.serial))205 return False206 # Accept the call and wait for the call to be accepted on AG.207 ad.droid.telecomCallAnswer(call_id, tel_defines.VT_STATE_AUDIO_ONLY)208 if not wait_for_call_state(log, ad, call_id, tel_defines.CALL_STATE_ACTIVE):209 log.error("Call {} on droid {} not active".format(210 call_id, ad.serial))211 return False212 return True213def wait_for_not_in_call(log, ad):214 """Wait for the droid to be OUT OF CALLING state.215 Args:216 log: log object217 ad: android device object218 Returns:219 True if success, False if fail.220 """221 ad.droid.telecomStartListeningForCallRemoved()222 calls = ad.droid.telecomCallGetCallIds()223 if len(calls) > 1:224 log.info("More than one call {} {}".format(calls, ad.serial))225 return False226 if len(calls) > 0:227 log.info("Got calls {} for {}".format(228 calls, ad.serial))229 try:230 event = ad.ed.pop_event(231 tel_defines.EventTelecomCallRemoved,232 tel_defines.MAX_WAIT_TIME_CONNECTION_STATE_UPDATE)233 except queue.Empty:234 log.info("wait_for_not_in_call Did not get {} droid {}".format(235 tel_defines.EventTelecomCallRemoved,236 ad.serial))237 return False238 finally:239 ad.droid.telecomStopListeningForCallRemoved()240 # Either we removed the only call or we never had a call previously, either241 # ways simply check if we are in in call now.242 return (not ad.droid.telecomIsInCall())243def wait_for_dialing(log, ad):244 """Wait for the droid to be in dialing state.245 Args:246 log: log object247 ad: android device object248 Returns:249 True if success, False if fail.250 """251 # Start listening for events before anything else happens.252 ad.droid.telecomStartListeningForCallAdded()253 # First check if we re in call, then simply return.254 if ad.droid.telecomIsInCall():255 ad.droid.telecomStopListeningForCallAdded()256 return True257 call_id = None258 # Now check if we already have calls matching the state.259 calls_in_state = get_calls_in_states(log, ad,260 [tel_defines.CALL_STATE_CONNECTING,261 tel_defines.CALL_STATE_DIALING])262 # If not then we need to poll for the calls themselves.263 if len(calls_in_state) == 0:264 event = None265 try:266 event = ad.ed.pop_event(267 tel_defines.EventTelecomCallAdded,268 tel_defines.MAX_WAIT_TIME_CONNECTION_STATE_UPDATE)269 except queue.Empty:270 log.info("Did not get {}".format(271 tel_defines.EventTelecomCallAdded))272 return False273 finally:274 ad.droid.telecomStopListeningForCallAdded()275 call_id = event['data']['CallId']276 else:277 call_id = calls_in_state[0]278 # We may still not be in-call if the call setup is going on.279 # We wait for the call state to move to dialing.280 log.info("call id {} droid {}".format(call_id, ad.serial))281 if not wait_for_call_state(282 log, ad, call_id, tel_defines.CALL_STATE_DIALING):283 return False284 # Finally check the call state.285 return ad.droid.telecomIsInCall()286def wait_for_ringing(log, ad):287 """Wait for the droid to be in ringing state.288 Args:289 log: log object290 ad: android device object291 Returns:292 True if success, False if fail.293 """294 log.info("waiting for ringing {}".format(ad.serial))295 # Start listening for events before anything else happens.296 ad.droid.telecomStartListeningForCallAdded()297 # First check if we re in call, then simply return.298 if ad.droid.telecomIsInCall():299 log.info("Device already in call {}".format(ad.serial))300 ad.droid.telecomStopListeningForCallAdded()301 return True302 call_id = None303 # Now check if we already have calls matching the state.304 calls_in_state = ad.droid.telecomCallGetCallIds()305 for c_id in calls_in_state:306 if ad.droid.telecomCallGetCallState(c_id) == tel_defines.CALL_STATE_RINGING:307 return True308 event = None309 call_id = None310 try:311 event = ad.ed.pop_event(312 tel_defines.EventTelecomCallAdded,...
test_call.py
Source:test_call.py
1from mako.template import Template2from mako import util3from util import result_lines, flatten_result4from test import TemplateTest, eq_5class CallTest(TemplateTest):6 def test_call(self):7 t = Template("""8 <%def name="foo()">9 hi im foo ${caller.body(y=5)}10 </%def>11 12 <%call expr="foo()" args="y, **kwargs">13 this is the body, y is ${y}14 </%call>15""")16 assert result_lines(t.render()) == ['hi im foo', 'this is the body, y is 5']17 def test_compound_call(self):18 t = Template("""19 <%def name="bar()">20 this is bar21 </%def>22 23 <%def name="comp1()">24 this comp1 should not be called25 </%def>26 27 <%def name="foo()">28 foo calling comp1: ${caller.comp1(x=5)}29 foo calling body: ${caller.body()}30 </%def>31 32 <%call expr="foo()">33 <%def name="comp1(x)">34 this is comp1, ${x}35 </%def>36 this is the body, ${comp1(6)}37 </%call>38 ${bar()}39""")40 assert result_lines(t.render()) == ['foo calling comp1:', 'this is comp1, 5', 'foo calling body:', 'this is the body,', 'this is comp1, 6', 'this is bar']41 def test_new_syntax(self):42 """test foo:bar syntax, including multiline args and expression eval."""43 44 # note the trailing whitespace in the bottom ${} expr, need to strip45 # that off < python 2.746 47 t = Template("""48 <%def name="foo(x, y, q, z)">49 ${x}50 ${y}51 ${q}52 ${",".join("%s->%s" % (a, b) for a, b in z)}53 </%def>54 55 <%self:foo x="this is x" y="${'some ' + 'y'}" q="56 this57 is58 q"59 60 z="${[61 (1, 2),62 (3, 4),63 (5, 6)64 ]65 66 }"/>67 """)68 69 eq_(70 result_lines(t.render()),71 ['this is x', 'some y', 'this', 'is', 'q', '1->2,3->4,5->6']72 )73 74 def test_ccall_caller(self):75 t = Template("""76 <%def name="outer_func()">77 OUTER BEGIN78 <%call expr="caller.inner_func()">79 INNER CALL80 </%call>81 OUTER END82 </%def>83 <%call expr="outer_func()">84 <%def name="inner_func()">85 INNER BEGIN86 ${caller.body()}87 INNER END88 </%def>89 </%call>90 """)91 #print t.code92 assert result_lines(t.render()) == [93 "OUTER BEGIN",94 "INNER BEGIN",95 "INNER CALL",96 "INNER END",97 "OUTER END",98 ]99 100 def test_stack_pop(self):101 t = Template("""102 <%def name="links()" buffered="True">103 Some links104 </%def>105 <%def name="wrapper(links)">106 <h1>${caller.body()}</h1>107 ${links}108 </%def>109 ## links() pushes a stack frame on. when complete,110 ## 'nextcaller' must be restored111 <%call expr="wrapper(links())">112 Some title113 </%call>114 """)115 assert result_lines(t.render()) == [116 "<h1>",117 "Some title",118 "</h1>",119 "Some links"120 ]121 122 def test_conditional_call(self):123 """test that 'caller' is non-None only if the immediate <%def> was called via <%call>"""124 t = Template("""125 <%def name="a()">126 % if caller:127 ${ caller.body() } \\128 % endif129 AAA130 ${ b() }131 </%def>132 <%def name="b()">133 % if caller:134 ${ caller.body() } \\135 % endif136 BBB137 ${ c() }138 </%def>139 <%def name="c()">140 % if caller:141 ${ caller.body() } \\142 % endif143 CCC144 </%def>145 <%call expr="a()">146 CALL147 </%call>148 """)149 assert result_lines(t.render()) == [150 "CALL",151 "AAA",152 "BBB",153 "CCC"154 ]155 156 def test_chained_call(self):157 """test %calls that are chained through their targets"""158 t = Template("""159 <%def name="a()">160 this is a. 161 <%call expr="b()">162 this is a's ccall. heres my body: ${caller.body()}163 </%call>164 </%def>165 <%def name="b()">166 this is b. heres my body: ${caller.body()}167 whats in the body's caller's body ?168 ${context.caller_stack[-2].body()}169 </%def>170 171 <%call expr="a()">172 heres the main templ call173 </%call>174 175""")176 assert result_lines(t.render()) == [177 'this is a.',178 'this is b. heres my body:',179 "this is a's ccall. heres my body:",180 'heres the main templ call',181 "whats in the body's caller's body ?",182 'heres the main templ call'183 ]184 def test_nested_call(self):185 """test %calls that are nested inside each other"""186 t = Template("""187 <%def name="foo()">188 ${caller.body(x=10)}189 </%def>190 x is ${x}191 <%def name="bar()">192 bar: ${caller.body()}193 </%def>194 <%call expr="foo()" args="x">195 this is foo body: ${x}196 <%call expr="bar()">197 this is bar body: ${x}198 </%call>199 </%call>200""")201 assert result_lines(t.render(x=5)) == [202 "x is 5",203 "this is foo body: 10",204 "bar:",205 "this is bar body: 10"206 ]207 208 def test_nested_call_2(self):209 t = Template("""210 x is ${x}211 <%def name="foo()">212 ${caller.foosub(x=10)}213 </%def>214 <%def name="bar()">215 bar: ${caller.barsub()}216 </%def>217 <%call expr="foo()">218 <%def name="foosub(x)">219 this is foo body: ${x}220 221 <%call expr="bar()">222 <%def name="barsub()">223 this is bar body: ${x}224 </%def>225 </%call>226 227 </%def>228 </%call>229""")230 assert result_lines(t.render(x=5)) == [231 "x is 5",232 "this is foo body: 10",233 "bar:",234 "this is bar body: 10"235 ]236 def test_nested_call_3(self):237 template = Template('''\238 <%def name="A()">239 ${caller.body()}240 </%def>241 <%def name="B()">242 ${caller.foo()}243 </%def>244 <%call expr="A()">245 <%call expr="B()">246 <%def name="foo()">247 foo248 </%def>249 </%call>250 </%call>251 ''')252 assert flatten_result(template.render()) == "foo"253 254 def test_chained_call_in_nested(self):255 t = Template("""256 <%def name="embedded()">257 <%def name="a()">258 this is a. 259 <%call expr="b()">260 this is a's ccall. heres my body: ${caller.body()}261 </%call>262 </%def>263 <%def name="b()">264 this is b. heres my body: ${caller.body()}265 whats in the body's caller's body ? ${context.caller_stack[-2].body()}266 </%def>267 <%call expr="a()">268 heres the main templ call269 </%call>270 </%def>271 ${embedded()}272""")273 #print t.code274 #print result_lines(t.render())275 assert result_lines(t.render()) == [276 'this is a.',277 'this is b. heres my body:',278 "this is a's ccall. heres my body:",279 'heres the main templ call',280 "whats in the body's caller's body ?",281 'heres the main templ call'282 ]283 284 def test_call_in_nested(self):285 t = Template("""286 <%def name="a()">287 this is a ${b()}288 <%def name="b()">289 this is b290 <%call expr="c()">291 this is the body in b's call292 </%call>293 </%def>294 <%def name="c()">295 this is c: ${caller.body()}296 </%def>297 </%def>298 ${a()}299""")300 assert result_lines(t.render()) == ['this is a', 'this is b', 'this is c:', "this is the body in b's call"]301 def test_regular_defs(self):302 t = Template("""303 <%!304 @runtime.supports_caller305 def a(context):306 context.write("this is a")307 if context['caller']:308 context['caller'].body()309 context.write("a is done")310 return ''311 %>312 313 <%def name="b()">314 this is b315 our body: ${caller.body()}316 ${a(context)}317 </%def>318 test 1319 <%call expr="a(context)">320 this is the body321 </%call>322 test 2323 <%call expr="b()">324 this is the body325 </%call>326 test 3327 <%call expr="b()">328 this is the body329 <%call expr="b()">330 this is the nested body331 </%call>332 </%call>333 """) 334 #print t.code335 assert result_lines(t.render()) == [336 "test 1",337 "this is a",338 "this is the body",339 "a is done",340 "test 2",341 "this is b",342 "our body:",343 "this is the body",344 "this is aa is done",345 "test 3",346 "this is b",347 "our body:",348 "this is the body",349 "this is b",350 "our body:",351 "this is the nested body",352 "this is aa is done",353 "this is aa is done"354 ]355 356 def test_call_in_nested_2(self):357 t = Template("""358 <%def name="a()">359 <%def name="d()">360 not this d361 </%def>362 this is a ${b()}363 <%def name="b()">364 <%def name="d()">365 not this d either366 </%def>367 this is b368 <%call expr="c()">369 <%def name="d()">370 this is d371 </%def>372 this is the body in b's call373 </%call>374 </%def>375 <%def name="c()">376 this is c: ${caller.body()}377 the embedded "d" is: ${caller.d()}378 </%def>379 </%def>380 ${a()}381""")382 assert result_lines(t.render()) == ['this is a', 'this is b', 'this is c:', "this is the body in b's call", 'the embedded "d" is:', 'this is d']383class SelfCacheTest(TemplateTest):384 """this test uses a now non-public API."""385 386 def test_basic(self):387 t = Template("""388 <%!389 cached = None390 %>391 <%def name="foo()">392 <% 393 global cached394 if cached:395 return "cached: " + cached396 __M_writer = context._push_writer()397 %>398 this is foo399 <%400 buf, __M_writer = context._pop_buffer_and_writer()401 cached = buf.getvalue()402 return cached403 %>404 </%def>405 406 ${foo()}407 ${foo()}408""")409 assert result_lines(t.render()) == [410 "this is foo",411 "cached:",412 "this is foo"413 ]...
views.py
Source:views.py
...69 current_call.save()70 botManager.SendDriver(current_call.chat_id)71 return HttpResponseRedirect(reverse("taxibot:NeedCarList"))72# оÑпÑавлÑÐµÑ ÐºÐ»Ð¸ÐµÐ½ÑÑ Ð´Ð°Ð½Ð½Ñе о пÑибÑÑие маÑÐ¸Ð½Ñ Ð¸ ÑбÑаÑÑÐ²Ð°ÐµÑ Ð´Ð°Ð½Ð½Ñе о маÑине73def EndCall(request):74 callId =None75 try:76 callId = request.POST["calls[]"]77 except:78 print("there are no selected calls")79 if (callId):80 current_call = TaxiCall.objects.get(call_id=callId)81 current_call.status = "arrived"82 chat_id = current_call.chat_id83 botManager.SendArrived(chat_id=chat_id)84 current_call.chat_id = 085 current_call.save()86 return HttpResponseRedirect(reverse("taxibot:accepted"))87class DBCalls(generic.ListView): # History of cars...
mock_calls.py
Source:mock_calls.py
...143 of calls specified. Missing calls, extra calls, and calls with144 mismatching arguments, all cause the assertion to fail.145 """146 return self._AssertCalls(self, calls, self._watched)147 def assertCall(self, call, action=None):...
call.py
Source:call.py
1# $Id: call.py 2171 2008-07-24 09:01:33Z bennylp $2#3# SIP call sample.4#5# Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>6#7# This program is free software; you can redistribute it and/or modify8# it under the terms of the GNU General Public License as published by9# the Free Software Foundation; either version 2 of the License, or10# (at your option) any later version.11#12# This program is distributed in the hope that it will be useful,13# but WITHOUT ANY WARRANTY; without even the implied warranty of14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the15# GNU General Public License for more details.16#17# You should have received a copy of the GNU General Public License18# along with this program; if not, write to the Free Software19# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 20#21import sys22import pjsua as pj23LOG_LEVEL=324current_call = None25# Logging callback26def log_cb(level, str, len):27 print str,28# Callback to receive events from account29class MyAccountCallback(pj.AccountCallback):30 def __init__(self, account=None):31 pj.AccountCallback.__init__(self, account)32 # Notification on incoming call33 def on_incoming_call(self, call):34 global current_call 35 if current_call:36 call.answer(486, "Busy")37 return38 39 print "Incoming call from ", call.info().remote_uri40 print "Press 'a' to answer"41 current_call = call42 call_cb = MyCallCallback(current_call)43 current_call.set_callback(call_cb)44 current_call.answer(180)45 46# Callback to receive events from Call47class MyCallCallback(pj.CallCallback):48 def __init__(self, call=None):49 pj.CallCallback.__init__(self, call)50 # Notification when call state has changed51 def on_state(self):52 global current_call53 print "Call with", self.call.info().remote_uri,54 print "is", self.call.info().state_text,55 print "last code =", self.call.info().last_code, 56 print "(" + self.call.info().last_reason + ")"57 58 if self.call.info().state == pj.CallState.DISCONNECTED:59 current_call = None60 print 'Current call is', current_call61 # Notification when call's media state has changed.62 def on_media_state(self):63 if self.call.info().media_state == pj.MediaState.ACTIVE:64 # Connect the call to sound device65 call_slot = self.call.info().conf_slot66 pj.Lib.instance().conf_connect(call_slot, 0)67 pj.Lib.instance().conf_connect(0, call_slot)68 print "Media is now active"69 else:70 print "Media is inactive"71# Function to make call72def make_call(uri):73 try:74 print "Making call to", uri75 return acc.make_call(uri, cb=MyCallCallback())76 except pj.Error, e:77 print "Exception: " + str(e)78 return None79 80# Create library instance81lib = pj.Lib()82try:83 # Init library with default config and some customized84 # logging config.85 lib.init(log_cfg = pj.LogConfig(level=LOG_LEVEL, callback=log_cb))86 # Create UDP transport which listens to any available port87 transport = lib.create_transport(pj.TransportType.UDP, 88 pj.TransportConfig(0))89 print "\nListening on", transport.info().host, 90 print "port", transport.info().port, "\n"91 92 # Start the library93 lib.start()94 # Create local account95 acc = lib.create_account_for_transport(transport, cb=MyAccountCallback())96 # If argument is specified then make call to the URI97 if len(sys.argv) > 1:98 lck = lib.auto_lock()99 current_call = make_call(sys.argv[1])100 print 'Current call is', current_call101 del lck102 my_sip_uri = "sip:" + transport.info().host + \103 ":" + str(transport.info().port)104 # Menu loop105 while True:106 print "My SIP URI is", my_sip_uri107 print "Menu: m=make call, h=hangup call, a=answer call, q=quit"108 input = sys.stdin.readline().rstrip("\r\n")109 if input == "m":110 if current_call:111 print "Already have another call"112 continue113 print "Enter destination URI to call: ", 114 input = sys.stdin.readline().rstrip("\r\n")115 if input == "":116 continue117 lck = lib.auto_lock()118 current_call = make_call(input)119 del lck120 elif input == "h":121 if not current_call:122 print "There is no call"123 continue124 current_call.hangup()125 elif input == "a":126 if not current_call:127 print "There is no call"128 continue129 current_call.answer(200)130 elif input == "q":131 break132 # Shutdown the library133 transport = None134 acc.delete()135 acc = None136 lib.destroy()137 lib = None138except pj.Error, e:139 print "Exception: " + str(e)140 lib.destroy()...
Using AI Code Generation
1var wpt = require('webpagetest');2var wptClient = wpt('www.webpagetest.org');3 if (err) return console.log(err);4 console.log('Test status:', data.statusCode);5 console.log('Test ID:', data.data.testId);6 console.log('Test URL:', data.data.userUrl);7 wptClient.getTestResults(data.data.testId, function(err, data) {8 if (err) return console.log(err);9 console.log('Test results:', data);10 });11});12var wpt = require('webpagetest');13var wptClient = wpt('www.webpagetest.org');14 if (err) return console.log(err);15 console.log('Test status:', data.statusCode);16 console.log('Test ID:', data.data.testId);17 console.log('Test URL:', data.data.userUrl);18 wptClient.getTestResults(data.data.testId, function(err, data) {19 if (err) return console.log(err);20 console.log('Test results:', data);21 });22});23var wpt = require('webpagetest');24var wptClient = wpt('www.webpagetest.org');25 if (err) return console.log(err);26 console.log('Test status:', data.statusCode);27 console.log('Test ID:', data.data.testId);28 console.log('Test URL:', data.data.userUrl);29 wptClient.getTestResults(data.data.testId, function(err, data) {30 if (err) return console.log(err);31 console.log('Test results:', data);32 });33});
Using AI Code Generation
1 if(err){2 console.log('error', err);3 }else{4 console.log('data', data);5 }6});7 if(err){8 console.log('error', err);9 }else{10 console.log('data', data);11 }12});13 if(err){14 console.log('error', err);15 }else{16 console.log('data', data);17 }18});19wpt.getLocations(function(err, data) {20 if(err){21 console.log('error', err);22 }else{23 console.log('data', data);24 }25});26wpt.getTesters(function(err, data) {27 if(err){28 console.log('error', err);29 }else{30 console.log('data', data);31 }32});33wpt.getStatus(function(err, data) {34 if(err){35 console.log('error', err);36 }else{37 console.log('data', data);38 }39});40wpt.getResults('140507_3B_8a1c', function(err, data) {41 if(err){42 console.log('error', err);43 }else{44 console.log('data', data);45 }46});47wpt.getPageSpeedResults('140507_3B_8a1c', function(err, data) {48 if(err){49 console.log('error', err);50 }else{51 console.log('data', data);52 }53});54wpt.getHarResults('140507_3B_8a1c', function(err, data) {55 if(err){56 console.log('error', err);
Using AI Code Generation
1var wptools = require('wptools');2var page = wptools.page('Albert Einstein');3page.get(function(err, resp) {4 console.log(resp);5});6var wptools = require('wptools');7var page = wptools.page('Albert Einstein');8page.get(function(err, resp) {9 console.log(resp);10});11var wptools = require('wptools');12var page = wptools.page('Albert Einstein');13page.get(function(err, resp) {14 console.log(resp);15});16var wptools = require('wptools');17var page = wptools.page('Albert Einstein');18page.get(function(err, resp) {19 console.log(resp);20});21var wptools = require('wptools');22var page = wptools.page('Albert Einstein');23page.get(function(err, resp) {24 console.log(resp);25});26var wptools = require('wptools');27var page = wptools.page('Albert Einstein');28page.get(function(err, resp) {29 console.log(resp);30});31var wptools = require('wptools');32var page = wptools.page('Albert Einstein');33page.get(function(err, resp) {34 console.log(resp);35});36var wptools = require('wptools');37var page = wptools.page('Albert Einstein');38page.get(function(err, resp) {39 console.log(resp);40});41var wptools = require('wptools');42var page = wptools.page('Albert Einstein');43page.get(function(err, resp) {44 console.log(resp);45});46var wptools = require('wptools');47var page = wptools.page('Albert
Using AI Code Generation
1var wptoolkit = require('wptoolkit');2 if (!error && response.statusCode == 200) {3 console.log(body);4 }5});6 if (!error && response.statusCode == 200) {7 console.log(body);8 }9});10 if (!error && response.statusCode == 200) {11 console.log(body);12 }13});14 if (!error && response.statusCode == 200) {15 console.log(body);16 }17});18 if (!error && response.statusCode == 200) {19 console.log(body);20 }21});22 if (!error && response.statusCode == 200) {23 console.log(body);24 }25});26 if (!error && response.statusCode == 200) {27 console.log(body);28 }29});30 if (!error && response.statusCode == 200) {31 console.log(body);32 }33});
Using AI Code Generation
1var wpt = require('wpt-api');2var wpt = new wpt(process.env.WPT_API_KEY, process.env.WPT_API_SECRET);3wpt.call('runtest', {url: url}, function(err, data) {4 if(err) {5 console.log(err);6 } else {7 console.log(data);8 }9});
Using AI Code Generation
1var wpt = require('wptoolkit');2var wp = new wpt();3wp.call('getCategoryList', function(data) {4console.log(data);5});6include 'wptoolkit.php';7$wpt = new wptoolkit();8$wpt->call('getCategoryList', function($data) {9echo $data;10});11var wpt = require('wptoolkit');12var wp = new wpt();13wp.call('getCategoryList', function(data) {14console.log(data);15});16include 'wptoolkit.php';17$wpt = new wptoolkit();18$wpt->call('getCategoryList', function($data) {19echo $data;20});
Using AI Code Generation
1var wpToolkit = require('wptoolkit');2var wp = new wpToolkit();3wp.call('posts.getPosts', {post_type: 'post', number: 10}, function(err, data){4if(err) console.log(err);5else console.log(data);6});7var wpToolkit = require('wptoolkit');8var wp = new wpToolkit();9wp.call('posts.getPosts', {post_type: 'post', number: 10}, function(err, data){10if(err) console.log(err);11else console.log(data);12});13var wpToolkit = require('wptoolkit');14var wp = new wpToolkit();15wp.call('posts.getPosts', {post_type: 'post', number: 10}, function(err, data){16if(err) console.log(err);17else console.log(data);18});19function getPosts() {20var wpToolkit = require('wptoolkit');21var wp = new wpToolkit();22wp.call('posts.getPosts', {post_type: 'post', number: 10}, function(err, data){23if(err) console.log(err);24else console.log(data);25});26}27getPosts();28getPosts();29getPosts();30at createError (/home/techadmin/Projects/wordpress/node_modules/axios/lib/core/createError.js:16:15)31at settle (/home/techadmin/Projects/wordpress/node_modules/axios/lib/core/settle.js:18:12)32at IncomingMessage.handleStreamEnd (/home/techadmin/Projects/wordpress/node_modules/axios/lib/adapters/http.js:201:11)33at IncomingMessage.emit (events.js:189:13)34at endReadableNT (_stream_readable.js:1103:12)
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!