Best Python code snippet using playwright-python
wsgi_server_test.py
Source:wsgi_server_test.py
1#!/usr/bin/env python2#3# Copyright 2007 Google Inc.4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17"""Tests for google.appengine.tools.devappserver2.wsgi_server."""18import errno19import json20import os21import select22import socket23import sys24import time25import unittest26import urllib227import google28import mox29from cherrypy import wsgiserver30from google.appengine.tools.devappserver2 import wsgi_server31class TestError(Exception):32 pass33class _SingleAddressWsgiServerTest(unittest.TestCase):34 def setUp(self):35 super(_SingleAddressWsgiServerTest, self).setUp()36 self.server = wsgi_server._SingleAddressWsgiServer(('localhost', 0),37 self.wsgi_application)38 self.server.start()39 def tearDown(self):40 super(_SingleAddressWsgiServerTest, self).tearDown()41 self.server.quit()42 def test_serve(self):43 result = urllib2.urlopen('http://localhost:%d/foo?bar=baz' %44 self.server.port)45 body = result.read()46 environ = json.loads(body)47 self.assertEqual(200, result.code)48 self.assertEqual('/foo', environ['PATH_INFO'])49 self.assertEqual('bar=baz', environ['QUERY_STRING'])50 def wsgi_application(self, environ, start_response):51 start_response('200 OK', [('Content-Type', 'application/json')])52 serializable_environ = environ.copy()53 del serializable_environ['wsgi.input']54 del serializable_environ['wsgi.errors']55 return [json.dumps(serializable_environ)]56 def other_wsgi_application(self, environ, start_response):57 start_response('200 OK', [('Content-Type', 'text/plain')])58 return ['Hello World']59 def test_set_app(self):60 self.server.set_app(self.other_wsgi_application)61 result = urllib2.urlopen('http://localhost:%d/foo?bar=baz' %62 self.server.port)63 body = result.read()64 self.assertEqual(200, result.code)65 self.assertEqual('Hello World', body)66 def test_set_error(self):67 self.server.set_error(204)68 result = urllib2.urlopen('http://localhost:%d/foo?bar=baz' %69 self.server.port)70 self.assertEqual(204, result.code)71class SharedCherryPyThreadPoolTest(unittest.TestCase):72 def setUp(self):73 self.mox = mox.Mox()74 self.mox.StubOutWithMock(wsgi_server._THREAD_POOL, 'submit')75 self.thread_pool = wsgi_server._SharedCherryPyThreadPool()76 def tearDown(self):77 self.mox.UnsetStubs()78 def test_put(self):79 connection = object()80 wsgi_server._THREAD_POOL.submit(self.thread_pool._handle, connection)81 self.mox.ReplayAll()82 self.thread_pool.put(connection)83 self.mox.VerifyAll()84 self.assertEqual(set([connection]), self.thread_pool._connections)85 def test_handle(self):86 connection = self.mox.CreateMock(wsgiserver.HTTPConnection)87 self.mox.StubOutWithMock(self.thread_pool._condition, 'notify')88 self.thread_pool._connections.add(connection)89 connection.communicate()90 connection.close()91 self.thread_pool._condition.notify()92 self.mox.ReplayAll()93 self.thread_pool._handle(connection)94 self.mox.VerifyAll()95 self.assertEqual(set(), self.thread_pool._connections)96 def test_handle_with_exception(self):97 connection = self.mox.CreateMock(wsgiserver.HTTPConnection)98 self.mox.StubOutWithMock(self.thread_pool._condition, 'notify')99 self.thread_pool._connections.add(connection)100 connection.communicate().AndRaise(TestError)101 connection.close()102 self.thread_pool._condition.notify()103 self.mox.ReplayAll()104 self.assertRaises(TestError, self.thread_pool._handle, connection)105 self.mox.VerifyAll()106 self.assertEqual(set(), self.thread_pool._connections)107 def test_stop(self):108 self.mox.ReplayAll()109 self.thread_pool.stop(3)110 self.mox.VerifyAll()111 def test_stop_no_connections(self):112 self.mox.ReplayAll()113 self.thread_pool.stop(0.1)114 self.mox.VerifyAll()115 def test_stop_with_connections(self):116 connection = self.mox.CreateMock(wsgiserver.HTTPConnection)117 self.thread_pool._connections.add(connection)118 self.mox.StubOutWithMock(self.thread_pool, '_shutdown_connection')119 self.thread_pool._shutdown_connection(connection)120 self.mox.ReplayAll()121 self.thread_pool.stop(1)122 self.mox.VerifyAll()123 def test_shutdown_connection(self):124 class DummyObect(object):125 pass126 connection = DummyObect()127 connection.rfile = DummyObect()128 connection.rfile.closed = False129 connection.socket = self.mox.CreateMockAnything()130 connection.socket.shutdown(socket.SHUT_RD)131 self.mox.ReplayAll()132 self.thread_pool._shutdown_connection(connection)133 self.mox.VerifyAll()134 def test_shutdown_connection_rfile_already_close(self):135 class DummyObect(object):136 pass137 connection = DummyObect()138 connection.rfile = DummyObect()139 connection.rfile.closed = True140 connection.socket = self.mox.CreateMockAnything()141 self.mox.ReplayAll()142 self.thread_pool._shutdown_connection(connection)143 self.mox.VerifyAll()144class SelectThreadTest(unittest.TestCase):145 class _MockSocket(object):146 def fileno(self):147 return id(self)148 def setUp(self):149 self.select_thread = wsgi_server.SelectThread()150 self.original_has_poll = wsgi_server._HAS_POLL151 self.mox = mox.Mox()152 self.mox.StubOutWithMock(select, 'select')153 if hasattr(select, 'poll'):154 self.mox.StubOutWithMock(select, 'poll')155 self.mox.StubOutWithMock(time, 'sleep')156 def tearDown(self):157 self.mox.UnsetStubs()158 wsgi_server._HAS_POLL = self.original_has_poll159 def test_add_socket(self):160 file_descriptors = self.select_thread._file_descriptors161 file_descriptor_to_callback = (162 self.select_thread._file_descriptor_to_callback)163 file_descriptors_copy = frozenset(self.select_thread._file_descriptors)164 file_descriptor_to_callback_copy = (165 self.select_thread._file_descriptor_to_callback.copy())166 s = self._MockSocket()167 callback = object()168 self.select_thread.add_socket(s, callback)169 self.assertEqual(file_descriptors_copy, file_descriptors)170 self.assertEqual(file_descriptor_to_callback_copy,171 file_descriptor_to_callback)172 self.assertEqual(frozenset([s.fileno()]),173 self.select_thread._file_descriptors)174 self.assertEqual({s.fileno(): callback},175 self.select_thread._file_descriptor_to_callback)176 def test_remove_socket(self):177 s1 = self._MockSocket()178 callback1 = object()179 s2 = self._MockSocket()180 callback2 = object()181 self.select_thread._file_descriptors = frozenset([s1.fileno(), s2.fileno()])182 self.select_thread._file_descriptor_to_callback = {183 s1.fileno(): callback1, s2.fileno(): callback2}184 file_descriptors = self.select_thread._file_descriptors185 file_descriptor_to_callback = (186 self.select_thread._file_descriptor_to_callback)187 file_descriptors_copy = frozenset(self.select_thread._file_descriptors)188 file_descriptor_to_callback_copy = (189 self.select_thread._file_descriptor_to_callback.copy())190 self.select_thread.remove_socket(s1)191 self.assertEqual(file_descriptors_copy, file_descriptors)192 self.assertEqual(file_descriptor_to_callback_copy,193 file_descriptor_to_callback)194 self.assertEqual(frozenset([s2.fileno()]),195 self.select_thread._file_descriptors)196 self.assertEqual({s2.fileno(): callback2},197 self.select_thread._file_descriptor_to_callback)198 def test_select_no_sockets(self):199 time.sleep(1)200 self.mox.ReplayAll()201 self.select_thread._select()202 self.mox.VerifyAll()203 def test_select_no_poll(self):204 wsgi_server._HAS_POLL = False205 s = self._MockSocket()206 callback = self.mox.CreateMockAnything()207 select.select(frozenset([s.fileno()]), [], [], 1).AndReturn(208 ([s.fileno()], [], []))209 callback()210 self.mox.ReplayAll()211 self.select_thread.add_socket(s, callback)212 self.select_thread._select()213 self.mox.VerifyAll()214 @unittest.skipUnless(wsgi_server._HAS_POLL, 'requires select.poll')215 def test_select_with_poll(self):216 s = self._MockSocket()217 callback = self.mox.CreateMockAnything()218 poll = self.mox.CreateMockAnything()219 select.poll().AndReturn(poll)220 poll.register(s.fileno(), select.POLLIN)221 poll.poll(1000).AndReturn([(s.fileno(), select.POLLIN)])222 callback()223 self.mox.ReplayAll()224 self.select_thread.add_socket(s, callback)225 self.select_thread._select()226 self.mox.VerifyAll()227 def test_select_not_ready_no_poll(self):228 wsgi_server._HAS_POLL = False229 s = self._MockSocket()230 callback = self.mox.CreateMockAnything()231 select.select(frozenset([s.fileno()]), [], [], 1).AndReturn(([], [], []))232 self.mox.ReplayAll()233 self.select_thread.add_socket(s, callback)234 self.select_thread._select()235 self.mox.VerifyAll()236 @unittest.skipUnless(wsgi_server._HAS_POLL, 'requires select.poll')237 def test_select_not_ready_with_poll(self):238 s = self._MockSocket()239 callback = self.mox.CreateMockAnything()240 poll = self.mox.CreateMockAnything()241 select.poll().AndReturn(poll)242 poll.register(s.fileno(), select.POLLIN)243 poll.poll(1000).AndReturn([])244 self.mox.ReplayAll()245 self.select_thread.add_socket(s, callback)246 self.select_thread._select()247 self.mox.VerifyAll()248class WsgiServerStartupTest(unittest.TestCase):249 def setUp(self):250 self.mox = mox.Mox()251 self.server = wsgi_server.WsgiServer(('localhost', 123), None)252 def tearDown(self):253 self.mox.UnsetStubs()254 def test_start_some_fail_to_bind(self):255 failing_server = self.mox.CreateMock(256 wsgi_server._SingleAddressWsgiServer)257 starting_server = self.mox.CreateMock(258 wsgi_server._SingleAddressWsgiServer)259 another_starting_server = self.mox.CreateMock(260 wsgi_server._SingleAddressWsgiServer)261 self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')262 self.mox.StubOutWithMock(socket, 'getaddrinfo')263 socket.getaddrinfo('localhost', 123, socket.AF_UNSPEC, socket.SOCK_STREAM,264 0, socket.AI_PASSIVE).AndReturn(265 [(None, None, None, None, ('foo', 'bar', 'baz')),266 (None, None, None, None, (1, 2, 3, 4, 5)),267 (None, None, None, None, (3, 4))])268 wsgi_server._SingleAddressWsgiServer(('foo', 'bar'), None).AndReturn(269 failing_server)270 wsgi_server._SingleAddressWsgiServer((1, 2), None).AndReturn(271 starting_server)272 wsgi_server._SingleAddressWsgiServer((3, 4), None).AndReturn(273 another_starting_server)274 starting_server.start()275 failing_server.start().AndRaise(wsgi_server.BindError)276 another_starting_server.start()277 self.mox.ReplayAll()278 self.server.start()279 self.mox.VerifyAll()280 self.assertItemsEqual([starting_server, another_starting_server],281 self.server._servers)282 def test_start_all_fail_to_bind(self):283 failing_server = self.mox.CreateMock(284 wsgi_server._SingleAddressWsgiServer)285 self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')286 self.mox.StubOutWithMock(socket, 'getaddrinfo')287 socket.getaddrinfo('localhost', 123, socket.AF_UNSPEC, socket.SOCK_STREAM,288 0, socket.AI_PASSIVE).AndReturn(289 [(None, None, None, None, ('foo', 'bar', 'baz'))])290 wsgi_server._SingleAddressWsgiServer(('foo', 'bar'), None).AndReturn(291 failing_server)292 failing_server.start().AndRaise(wsgi_server.BindError)293 self.mox.ReplayAll()294 self.assertRaises(wsgi_server.BindError, self.server.start)295 self.mox.VerifyAll()296 def test_remove_duplicates(self):297 foo_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)298 foo2_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)299 self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')300 self.mox.StubOutWithMock(socket, 'getaddrinfo')301 socket.getaddrinfo('localhost', 123, socket.AF_UNSPEC, socket.SOCK_STREAM,302 0, socket.AI_PASSIVE).AndReturn(303 [(0, 0, 0, '', ('127.0.0.1', 123)),304 (0, 0, 0, '', ('::1', 123, 0, 0)),305 (0, 0, 0, '', ('127.0.0.1', 123))])306 wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 123), None).AndReturn(307 foo_server)308 foo_server.start()309 wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(310 foo2_server)311 foo2_server.start()312 self.mox.ReplayAll()313 self.server.start()314 self.mox.VerifyAll()315 def test_quit(self):316 running_server = self.mox.CreateMock(317 wsgi_server._SingleAddressWsgiServer)318 self.server._servers = [running_server]319 running_server.quit()320 self.mox.ReplayAll()321 self.server.quit()322 self.mox.VerifyAll()323class WsgiServerPort0StartupTest(unittest.TestCase):324 def setUp(self):325 self.mox = mox.Mox()326 self.server = wsgi_server.WsgiServer(('localhost', 0), None)327 def tearDown(self):328 self.mox.UnsetStubs()329 def test_basic_behavior(self):330 inet4_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)331 inet6_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)332 self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')333 self.mox.StubOutWithMock(socket, 'getaddrinfo')334 socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,335 socket.AI_PASSIVE).AndReturn(336 [(None, None, None, None, ('127.0.0.1', 0, 'baz')),337 (None, None, None, None, ('::1', 0, 'baz'))])338 wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(339 inet4_server)340 inet4_server.start()341 inet4_server.port = 123342 wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(343 inet6_server)344 inet6_server.start()345 self.mox.ReplayAll()346 self.server.start()347 self.mox.VerifyAll()348 self.assertItemsEqual([inet4_server, inet6_server],349 self.server._servers)350 def test_retry_eaddrinuse(self):351 inet4_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)352 inet6_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)353 inet4_server_retry = self.mox.CreateMock(354 wsgi_server._SingleAddressWsgiServer)355 inet6_server_retry = self.mox.CreateMock(356 wsgi_server._SingleAddressWsgiServer)357 self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')358 self.mox.StubOutWithMock(socket, 'getaddrinfo')359 socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,360 socket.AI_PASSIVE).AndReturn(361 [(None, None, None, None, ('127.0.0.1', 0, 'baz')),362 (None, None, None, None, ('::1', 0, 'baz'))])363 # First try364 wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(365 inet4_server)366 inet4_server.start()367 inet4_server.port = 123368 wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(369 inet6_server)370 inet6_server.start().AndRaise(371 wsgi_server.BindError('message', (errno.EADDRINUSE, 'in use')))372 inet4_server.quit()373 # Retry374 wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(375 inet4_server_retry)376 inet4_server_retry.start()377 inet4_server_retry.port = 456378 wsgi_server._SingleAddressWsgiServer(('::1', 456), None).AndReturn(379 inet6_server_retry)380 inet6_server_retry.start()381 self.mox.ReplayAll()382 self.server.start()383 self.mox.VerifyAll()384 self.assertItemsEqual([inet4_server_retry, inet6_server_retry],385 self.server._servers)386 def test_retry_limited(self):387 inet4_servers = [self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)388 for _ in range(wsgi_server._PORT_0_RETRIES)]389 inet6_servers = [self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)390 for _ in range(wsgi_server._PORT_0_RETRIES)]391 self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')392 self.mox.StubOutWithMock(socket, 'getaddrinfo')393 socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,394 socket.AI_PASSIVE).AndReturn(395 [(None, None, None, None, ('127.0.0.1', 0, 'baz')),396 (None, None, None, None, ('::1', 0, 'baz'))])397 for offset, (inet4_server, inet6_server) in enumerate(zip(398 inet4_servers, inet6_servers)):399 wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(400 inet4_server)401 inet4_server.start()402 inet4_server.port = offset + 1403 wsgi_server._SingleAddressWsgiServer(('::1', offset + 1), None).AndReturn(404 inet6_server)405 inet6_server.start().AndRaise(406 wsgi_server.BindError('message', (errno.EADDRINUSE, 'in use')))407 inet4_server.quit()408 self.mox.ReplayAll()409 self.assertRaises(wsgi_server.BindError, self.server.start)410 self.mox.VerifyAll()411 def test_ignore_other_errors(self):412 inet4_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)413 inet6_server = self.mox.CreateMock(wsgi_server._SingleAddressWsgiServer)414 self.mox.StubOutWithMock(wsgi_server, '_SingleAddressWsgiServer')415 self.mox.StubOutWithMock(socket, 'getaddrinfo')416 socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,417 socket.AI_PASSIVE).AndReturn(418 [(None, None, None, None, ('127.0.0.1', 0, 'baz')),419 (None, None, None, None, ('::1', 0, 'baz'))])420 wsgi_server._SingleAddressWsgiServer(('127.0.0.1', 0), None).AndReturn(421 inet4_server)422 inet4_server.start()423 inet4_server.port = 123424 wsgi_server._SingleAddressWsgiServer(('::1', 123), None).AndReturn(425 inet6_server)426 inet6_server.start().AndRaise(427 wsgi_server.BindError('message', (errno.ENOPROTOOPT, 'no protocol')))428 self.mox.ReplayAll()429 self.server.start()430 self.mox.VerifyAll()431 self.assertItemsEqual([inet4_server],432 self.server._servers)433class _SingleAddressWsgiServerStartupTest(unittest.TestCase):434 def setUp(self):435 self.mox = mox.Mox()436 self.server = wsgi_server._SingleAddressWsgiServer(('localhost', 0), None)437 def tearDown(self):438 self.mox.UnsetStubs()439 def test_start_port_in_use(self):440 self.mox.StubOutWithMock(socket, 'getaddrinfo')441 self.mox.StubOutWithMock(self.server, 'bind')442 af = object()443 socktype = object()444 proto = object()445 socket.getaddrinfo('localhost', 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,446 socket.AI_PASSIVE).AndReturn(447 [(af, socktype, proto, None, None)])448 self.server.bind(af, socktype, proto).AndRaise(socket.error)449 self.mox.ReplayAll()450 self.assertRaises(wsgi_server.BindError, self.server.start)451 self.mox.VerifyAll()452 def test_start(self):453 # Ensure no CherryPy thread pools are started.454 self.mox.StubOutWithMock(wsgiserver.ThreadPool, 'start')455 self.mox.StubOutWithMock(wsgi_server._SELECT_THREAD, 'add_socket')456 wsgi_server._SELECT_THREAD.add_socket(mox.IsA(socket.socket),457 self.server.tick)458 self.mox.ReplayAll()459 self.server.start()460 self.mox.VerifyAll()461 def test_quit(self):462 self.mox.StubOutWithMock(wsgi_server._SELECT_THREAD, 'remove_socket')463 self.server.socket = object()464 self.server.requests = self.mox.CreateMock(465 wsgi_server._SharedCherryPyThreadPool)466 wsgi_server._SELECT_THREAD.remove_socket(self.server.socket)467 self.server.requests.stop(timeout=1)468 self.mox.ReplayAll()469 self.server.quit()470 self.mox.VerifyAll()471if __name__ == '__main__':...
iperf_server_test.py
Source:iperf_server_test.py
1#!/usr/bin/env python32#3# Copyright 2019 - The Android Open Source Project4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16import logging17import unittest18import mock19import os20from acts.controllers import iperf_server21from acts.controllers.iperf_server import IPerfServer22from acts.controllers.iperf_server import IPerfServerOverAdb23from acts.controllers.iperf_server import IPerfServerOverSsh24# The position in the call tuple that represents the args array.25ARGS = 026# The position in the call tuple that represents the kwargs dict.27KWARGS = 128MOCK_LOGFILE_PATH = '/path/to/foo'29class IPerfServerModuleTest(unittest.TestCase):30 """Tests the acts.controllers.iperf_server module."""31 def test_create_creates_local_iperf_server_with_int(self):32 self.assertIsInstance(33 iperf_server.create([12345])[0],34 IPerfServer,35 'create() failed to create IPerfServer for integer input.'36 )37 def test_create_creates_local_iperf_server_with_str(self):38 self.assertIsInstance(39 iperf_server.create(['12345'])[0],40 IPerfServer,41 'create() failed to create IPerfServer for integer input.'42 )43 def test_create_cannot_create_local_iperf_server_with_bad_str(self):44 with self.assertRaises(ValueError):45 iperf_server.create(['12345BAD_STRING'])46 @mock.patch('acts.controllers.iperf_server.utils')47 def test_create_creates_server_over_ssh_with_ssh_config_and_port(self, _):48 self.assertIsInstance(49 iperf_server.create([{'ssh_config': {'user': '', 'host': ''},50 'port': ''}])[0],51 IPerfServerOverSsh,52 'create() failed to create IPerfServerOverSsh for a valid config.'53 )54 def test_create_creates_server_over_adb_with_proper_config(self):55 self.assertIsInstance(56 iperf_server.create([{'AndroidDevice': '53R147', 'port': 0}])[0],57 IPerfServerOverAdb,58 'create() failed to create IPerfServerOverAdb for a valid config.'59 )60 def test_create_raises_value_error_on_bad_config_dict(self):61 with self.assertRaises(ValueError):62 iperf_server.create([{'AndroidDevice': '53R147', 'ssh_config': {}}])63 def test_get_port_from_ss_output_returns_correct_port_ipv4(self):64 ss_output = ('tcp LISTEN 0 5 127.0.0.1:<PORT> *:*'65 ' users:(("cmd",pid=<PID>,fd=3))')66 self.assertEqual(67 iperf_server._get_port_from_ss_output(ss_output, '<PID>'), '<PORT>')68 def test_get_port_from_ss_output_returns_correct_port_ipv6(self):69 ss_output = ('tcp LISTEN 0 5 ff:ff:ff:ff:ff:ff:<PORT> *:*'70 ' users:(("cmd",pid=<PID>,fd=3))')71 self.assertEqual(72 iperf_server._get_port_from_ss_output(ss_output, '<PID>'), '<PORT>')73class IPerfServerBaseTest(unittest.TestCase):74 """Tests acts.controllers.iperf_server.IPerfServerBase."""75 @mock.patch('os.makedirs')76 def test_get_full_file_path_creates_parent_directory(self, mock_makedirs):77 # Will never actually be created/used.78 logging.log_path = '/tmp/unit_test_garbage'79 server = IPerfServer('port')80 full_file_path = server._get_full_file_path()81 self.assertTrue(82 mock_makedirs.called,83 'Did not attempt to create a directory.'84 )85 self.assertEqual(86 os.path.dirname(full_file_path),87 mock_makedirs.call_args[ARGS][0],88 'The parent directory of the full file path was not created.'89 )90class IPerfServerTest(unittest.TestCase):91 """Tests acts.controllers.iperf_server.IPerfServer."""92 PID = 12345693 def setUp(self):94 iperf_server._get_port_from_ss_output = lambda *_: IPerfServerTest.PID95 @mock.patch('builtins.open')96 @mock.patch('acts.controllers.iperf_server.subprocess')97 @mock.patch('acts.controllers.iperf_server.job')98 def test_start_makes_started_true(self, mock_job, __, ___):99 """Tests calling start() without calling stop() makes started True."""100 server = IPerfServer('port')101 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH102 server.start()103 self.assertTrue(server.started)104 @mock.patch('builtins.open')105 @mock.patch('acts.controllers.iperf_server.subprocess')106 @mock.patch('acts.controllers.iperf_server.job')107 def test_start_stop_makes_started_false(self, _, __, ___):108 """Tests calling start() without calling stop() makes started True."""109 server = IPerfServer('port')110 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH111 server.start()112 server.stop()113 self.assertFalse(server.started)114 @mock.patch('builtins.open')115 @mock.patch('acts.controllers.iperf_server.subprocess')116 @mock.patch('acts.controllers.iperf_server.job')117 def test_start_sets_current_log_file(self, _, __, ___):118 server = IPerfServer('port')119 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH120 server.start()121 self.assertEqual(122 server._current_log_file,123 MOCK_LOGFILE_PATH,124 'The _current_log_file was not received from _get_full_file_path.'125 )126 @mock.patch('builtins.open')127 @mock.patch('acts.controllers.iperf_server.subprocess')128 def test_stop_returns_current_log_file(self, _, __):129 server = IPerfServer('port')130 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH131 server._current_log_file = MOCK_LOGFILE_PATH132 server._iperf_process = mock.Mock()133 log_file = server.stop()134 self.assertEqual(135 log_file,136 MOCK_LOGFILE_PATH,137 'The _current_log_file was not returned by stop().'138 )139 @mock.patch('builtins.open')140 @mock.patch('acts.controllers.iperf_server.subprocess')141 @mock.patch('acts.controllers.iperf_server.job')142 def test_start_does_not_run_two_concurrent_processes(self, start_proc, _, __):143 server = IPerfServer('port')144 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH145 server._iperf_process = mock.Mock()146 server.start()147 self.assertFalse(148 start_proc.called,149 'start() should not begin a second process if another is running.'150 )151 @mock.patch('acts.utils.stop_standing_subprocess')152 def test_stop_exits_early_if_no_process_has_started(self, stop_proc):153 server = IPerfServer('port')154 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH155 server._iperf_process = None156 server.stop()157 self.assertFalse(158 stop_proc.called,159 'stop() should not kill a process if no process is running.'160 )161class IPerfServerOverSshTest(unittest.TestCase):162 """Tests acts.controllers.iperf_server.IPerfServerOverSsh."""163 INIT_ARGS = [{'host': 'TEST_HOST', 'user': 'test'}, 'PORT']164 @mock.patch('acts.controllers.iperf_server.connection')165 def test_start_makes_started_true(self, _):166 """Tests calling start() without calling stop() makes started True."""167 server = IPerfServerOverSsh(*self.INIT_ARGS)168 server._ssh_session = mock.Mock()169 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH170 server.start()171 self.assertTrue(server.started)172 @mock.patch('builtins.open')173 @mock.patch('acts.controllers.iperf_server.connection')174 def test_start_stop_makes_started_false(self, _, __):175 """Tests calling start() without calling stop() makes started True."""176 server = IPerfServerOverSsh(*self.INIT_ARGS)177 server._ssh_session = mock.Mock()178 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH179 server.start()180 server.stop()181 self.assertFalse(server.started)182 @mock.patch('builtins.open')183 @mock.patch('acts.controllers.iperf_server.connection')184 def test_stop_returns_expected_log_file(self, _, __):185 server = IPerfServerOverSsh(*self.INIT_ARGS)186 server._ssh_session = mock.Mock()187 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH188 server._iperf_pid = mock.Mock()189 log_file = server.stop()190 self.assertEqual(191 log_file,192 MOCK_LOGFILE_PATH,193 'The expected log file was not returned by stop().'194 )195 @mock.patch('acts.controllers.iperf_server.connection')196 def test_start_does_not_run_two_concurrent_processes(self, _):197 server = IPerfServerOverSsh(*self.INIT_ARGS)198 server._ssh_session = mock.Mock()199 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH200 server._iperf_pid = mock.Mock()201 server.start()202 self.assertFalse(203 server._ssh_session.run_async.called,204 'start() should not begin a second process if another is running.'205 )206 @mock.patch('acts.utils.stop_standing_subprocess')207 @mock.patch('acts.controllers.iperf_server.connection')208 def test_stop_exits_early_if_no_process_has_started(self, _, __):209 server = IPerfServerOverSsh(*self.INIT_ARGS)210 server._ssh_session = mock.Mock()211 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH212 server._iperf_pid = None213 server.stop()214 self.assertFalse(215 server._ssh_session.run_async.called,216 'stop() should not kill a process if no process is running.'217 )218class IPerfServerOverAdbTest(unittest.TestCase):219 """Tests acts.controllers.iperf_server.IPerfServerOverSsh."""220 ANDROID_DEVICE_PROP = ('acts.controllers.iperf_server.'221 'IPerfServerOverAdb._android_device')222 @mock.patch(ANDROID_DEVICE_PROP)223 def test_start_makes_started_true(self, mock_ad):224 """Tests calling start() without calling stop() makes started True."""225 server = IPerfServerOverAdb('53R147', 'PORT')226 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH227 mock_ad.adb.shell.return_value = '<PID>'228 server.start()229 self.assertTrue(server.started)230 @mock.patch('acts.libs.proc.job.run')231 @mock.patch('builtins.open')232 @mock.patch(ANDROID_DEVICE_PROP)233 def test_start_stop_makes_started_false(self, mock_ad, _, __):234 """Tests calling start() without calling stop() makes started True."""235 server = IPerfServerOverAdb('53R147', 'PORT')236 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH237 mock_ad.adb.shell.side_effect = ['<PID>', '', '', '']238 server.start()239 server.stop()240 self.assertFalse(server.started)241 @mock.patch('acts.libs.proc.job.run')242 @mock.patch('builtins.open')243 @mock.patch(ANDROID_DEVICE_PROP)244 def test_stop_returns_expected_log_file(self, mock_ad, _, __):245 server = IPerfServerOverAdb('53R147', 'PORT')246 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH247 server._iperf_process = mock.Mock()248 server._iperf_process_adb_pid = '<PID>'249 mock_ad.adb.shell.side_effect = ['', '', '']250 log_file = server.stop()251 self.assertEqual(252 log_file,253 MOCK_LOGFILE_PATH,254 'The expected log file was not returned by stop().'255 )256 @mock.patch(ANDROID_DEVICE_PROP)257 def test_start_does_not_run_two_concurrent_processes(self, android_device):258 server = IPerfServerOverAdb('53R147', 'PORT')259 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH260 server._iperf_process = mock.Mock()261 server.start()262 self.assertFalse(263 android_device.adb.shell_nb.called,264 'start() should not begin a second process if another is running.'265 )266 @mock.patch('acts.libs.proc.job.run')267 @mock.patch('builtins.open')268 @mock.patch(ANDROID_DEVICE_PROP)269 def test_stop_exits_early_if_no_process_has_started(self, android_device, _,270 __):271 server = IPerfServerOverAdb('53R147', 'PORT')272 server._get_full_file_path = lambda _: MOCK_LOGFILE_PATH273 server._iperf_pid = None274 server.stop()275 self.assertFalse(276 android_device.adb.shell_nb.called,277 'stop() should not kill a process if no process is running.'278 )279if __name__ == '__main__':...
fetchmail.py
Source:fetchmail.py
1# -*- coding: utf-8 -*-2# Part of Odoo. See LICENSE file for full copyright and licensing details.3import logging4import poplib5from imaplib import IMAP4, IMAP4_SSL6from poplib import POP3, POP3_SSL7from odoo import api, fields, models, tools, _8from odoo.exceptions import UserError9_logger = logging.getLogger(__name__)10MAX_POP_MESSAGES = 5011MAIL_TIMEOUT = 6012# Workaround for Python 2.7.8 bug https://bugs.python.org/issue2390613poplib._MAXLINE = 6553614class FetchmailServer(models.Model):15 """Incoming POP/IMAP mail server account"""16 _name = 'fetchmail.server'17 _description = "POP/IMAP Server"18 _order = 'priority'19 name = fields.Char('Name', required=True)20 active = fields.Boolean('Active', default=True)21 state = fields.Selection([22 ('draft', 'Not Confirmed'),23 ('done', 'Confirmed'),24 ], string='Status', index=True, readonly=True, copy=False, default='draft')25 server = fields.Char(string='Server Name', readonly=True, help="Hostname or IP of the mail server", states={'draft': [('readonly', False)]})26 port = fields.Integer(readonly=True, states={'draft': [('readonly', False)]})27 type = fields.Selection([28 ('pop', 'POP Server'),29 ('imap', 'IMAP Server'),30 ('local', 'Local Server'),31 ], 'Server Type', index=True, required=True, default='pop')32 is_ssl = fields.Boolean('SSL/TLS', help="Connections are encrypted with SSL/TLS through a dedicated port (default: IMAPS=993, POP3S=995)")33 attach = fields.Boolean('Keep Attachments', help="Whether attachments should be downloaded. "34 "If not enabled, incoming emails will be stripped of any attachments before being processed", default=True)35 original = fields.Boolean('Keep Original', help="Whether a full original copy of each email should be kept for reference "36 "and attached to each processed message. This will usually double the size of your message database.")37 date = fields.Datetime(string='Last Fetch Date', readonly=True)38 user = fields.Char(string='Username', readonly=True, states={'draft': [('readonly', False)]})39 password = fields.Char(readonly=True, states={'draft': [('readonly', False)]})40 action_id = fields.Many2one('ir.actions.server', string='Server Action', help="Optional custom server action to trigger for each incoming mail, on the record that was created or updated by this mail")41 object_id = fields.Many2one('ir.model', string="Create a New Record", help="Process each incoming mail as part of a conversation "42 "corresponding to this document type. This will create "43 "new documents for new conversations, or attach follow-up "44 "emails to the existing conversations (documents).")45 priority = fields.Integer(string='Server Priority', readonly=True, states={'draft': [('readonly', False)]}, help="Defines the order of processing, lower values mean higher priority", default=5)46 message_ids = fields.One2many('mail.mail', 'fetchmail_server_id', string='Messages', readonly=True)47 configuration = fields.Text('Configuration', readonly=True)48 script = fields.Char(readonly=True, default='/mail/static/scripts/openerp_mailgate.py')49 @api.onchange('type', 'is_ssl', 'object_id')50 def onchange_server_type(self):51 self.port = 052 if self.type == 'pop':53 self.port = self.is_ssl and 995 or 11054 elif self.type == 'imap':55 self.port = self.is_ssl and 993 or 14356 else:57 self.server = ''58 conf = {59 'dbname': self.env.cr.dbname,60 'uid': self.env.uid,61 'model': self.object_id.model if self.object_id else 'MODELNAME'62 }63 self.configuration = """64 Use the below script with the following command line options with your Mail Transport Agent (MTA)65 openerp_mailgate.py --host=HOSTNAME --port=PORT -u %(uid)d -p PASSWORD -d %(dbname)s66 Example configuration for the postfix mta running locally:67 /etc/postfix/virtual_aliases:68 @youdomain openerp_mailgate@localhost69 /etc/aliases:70 openerp_mailgate: "|/path/to/openerp-mailgate.py --host=localhost -u %(uid)d -p PASSWORD -d %(dbname)s"71 """ % conf72 @api.model73 def create(self, values):74 res = super(FetchmailServer, self).create(values)75 self._update_cron()76 return res77 @api.multi78 def write(self, values):79 res = super(FetchmailServer, self).write(values)80 self._update_cron()81 return res82 @api.multi83 def unlink(self):84 res = super(FetchmailServer, self).unlink()85 self._update_cron()86 return res87 @api.multi88 def set_draft(self):89 self.write({'state': 'draft'})90 return True91 @api.multi92 def connect(self):93 self.ensure_one()94 if self.type == 'imap':95 if self.is_ssl:96 connection = IMAP4_SSL(self.server, int(self.port))97 else:98 connection = IMAP4(self.server, int(self.port))99 connection.login(self.user, self.password)100 elif self.type == 'pop':101 if self.is_ssl:102 connection = POP3_SSL(self.server, int(self.port))103 else:104 connection = POP3(self.server, int(self.port))105 #TODO: use this to remove only unread messages106 #connection.user("recent:"+server.user)107 connection.user(self.user)108 connection.pass_(self.password)109 # Add timeout on socket110 connection.sock.settimeout(MAIL_TIMEOUT)111 return connection112 @api.multi113 def button_confirm_login(self):114 for server in self:115 try:116 connection = server.connect()117 server.write({'state': 'done'})118 except Exception as err:119 _logger.info("Failed to connect to %s server %s.", server.type, server.name, exc_info=True)120 raise UserError(_("Connection test failed: %s") % tools.ustr(err))121 finally:122 try:123 if connection:124 if server.type == 'imap':125 connection.close()126 elif server.type == 'pop':127 connection.quit()128 except Exception:129 # ignored, just a consequence of the previous exception130 pass131 return True132 @api.model133 def _fetch_mails(self):134 """ Method called by cron to fetch mails from servers """135 return self.search([('state', '=', 'done'), ('type', 'in', ['pop', 'imap'])]).fetch_mail()136 @api.multi137 def fetch_mail(self):138 """ WARNING: meant for cron usage only - will commit() after each email! """139 additionnal_context = {140 'fetchmail_cron_running': True141 }142 MailThread = self.env['mail.thread']143 for server in self:144 _logger.info('start checking for new emails on %s server %s', server.type, server.name)145 additionnal_context['fetchmail_server_id'] = server.id146 additionnal_context['server_type'] = server.type147 count, failed = 0, 0148 imap_server = None149 pop_server = None150 if server.type == 'imap':151 try:152 imap_server = server.connect()153 imap_server.select()154 result, data = imap_server.search(None, '(UNSEEN)')155 for num in data[0].split():156 res_id = None157 result, data = imap_server.fetch(num, '(RFC822)')158 imap_server.store(num, '-FLAGS', '\\Seen')159 try:160 res_id = MailThread.with_context(**additionnal_context).message_process(server.object_id.model, data[0][1], save_original=server.original, strip_attachments=(not server.attach))161 except Exception:162 _logger.info('Failed to process mail from %s server %s.', server.type, server.name, exc_info=True)163 failed += 1164 if res_id and server.action_id:165 server.action_id.with_context({166 'active_id': res_id,167 'active_ids': [res_id],168 'active_model': self.env.context.get("thread_model", server.object_id.model)169 }).run()170 imap_server.store(num, '+FLAGS', '\\Seen')171 self._cr.commit()172 count += 1173 _logger.info("Fetched %d email(s) on %s server %s; %d succeeded, %d failed.", count, server.type, server.name, (count - failed), failed)174 except Exception:175 _logger.info("General failure when trying to fetch mail from %s server %s.", server.type, server.name, exc_info=True)176 finally:177 if imap_server:178 imap_server.close()179 imap_server.logout()180 elif server.type == 'pop':181 try:182 while True:183 pop_server = server.connect()184 (num_messages, total_size) = pop_server.stat()185 pop_server.list()186 for num in range(1, min(MAX_POP_MESSAGES, num_messages) + 1):187 (header, messages, octets) = pop_server.retr(num)188 message = (b'\n').join(messages)189 res_id = None190 try:191 res_id = MailThread.with_context(**additionnal_context).message_process(server.object_id.model, message, save_original=server.original, strip_attachments=(not server.attach))192 pop_server.dele(num)193 except Exception:194 _logger.info('Failed to process mail from %s server %s.', server.type, server.name, exc_info=True)195 failed += 1196 if res_id and server.action_id:197 server.action_id.with_context({198 'active_id': res_id,199 'active_ids': [res_id],200 'active_model': self.env.context.get("thread_model", server.object_id.model)201 }).run()202 self.env.cr.commit()203 if num_messages < MAX_POP_MESSAGES:204 break205 pop_server.quit()206 _logger.info("Fetched %d email(s) on %s server %s; %d succeeded, %d failed.", num_messages, server.type, server.name, (num_messages - failed), failed)207 except Exception:208 _logger.info("General failure when trying to fetch mail from %s server %s.", server.type, server.name, exc_info=True)209 finally:210 if pop_server:211 pop_server.quit()212 server.write({'date': fields.Datetime.now()})213 return True214 @api.model215 def _update_cron(self):216 if self.env.context.get('fetchmail_cron_running'):217 return218 try:219 # Enabled/Disable cron based on the number of 'done' server of type pop or imap220 cron = self.env.ref('fetchmail.ir_cron_mail_gateway_action')221 cron.toggle(model=self._name, domain=[('state', '=', 'done'), ('type', 'in', ['pop', 'imap'])])222 except ValueError:...
socket_test_utils.py
Source:socket_test_utils.py
1#2# Copyright 2018 - The Android Open Source Project3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15import queue16import re17import threading18import time19from acts.test_utils.net import connectivity_const as cconst20from acts import asserts21MSG = "Test message "22PKTS = 523""" Methods for android.system.Os based sockets """24def open_android_socket(ad, domain, sock_type, ip, port):25 """ Open TCP or UDP using android.system.Os class26 Args:27 1. ad - android device object28 2. domain - IPv4 or IPv6 type29 3. sock_type - UDP or TCP socket30 4. ip - IP addr on the device31 5. port - open socket on port32 Returns:33 File descriptor key34 """35 fd_key = ad.droid.openSocket(domain, sock_type, ip, port)36 ad.log.info("File descriptor: %s" % fd_key)37 asserts.assert_true(fd_key, "Failed to open socket")38 return fd_key39def close_android_socket(ad, fd_key):40 """ Close socket41 Args:42 1. ad - android device object43 2. fd_key - file descriptor key44 """45 status = ad.droid.closeSocket(fd_key)46 asserts.assert_true(status, "Failed to close socket")47def listen_accept_android_socket(client,48 server,49 client_fd,50 server_fd,51 server_ip,52 server_port):53 """ Listen, accept TCP sockets54 Args:55 1. client : ad object for client device56 2. server : ad object for server device57 3. client_fd : client's socket handle58 4. server_fd : server's socket handle59 5. server_ip : send data to this IP60 6. server_port : send data to this port61 """62 server.droid.listenSocket(server_fd)63 client.droid.connectSocket(client_fd, server_ip, server_port)64 sock = server.droid.acceptSocket(server_fd)65 asserts.assert_true(sock, "Failed to accept socket")66 return sock67def send_recv_data_android_sockets(client,68 server,69 client_fd,70 server_fd,71 server_ip,72 server_port):73 """ Send TCP or UDP data over android os sockets from client to server.74 Verify that server received the data.75 Args:76 1. client : ad object for client device77 2. server : ad object for server device78 3. client_fd : client's socket handle79 4. server_fd : server's socket handle80 5. server_ip : send data to this IP81 6. server_port : send data to this port82 """83 send_list = []84 recv_list = []85 for _ in range(1, PKTS+1):86 msg = MSG + " %s" % _87 send_list.append(msg)88 client.log.info("Sending message: %s" % msg)89 client.droid.sendDataOverSocket(server_ip, server_port, msg, client_fd)90 recv_msg = server.droid.recvDataOverSocket(server_fd)91 server.log.info("Received message: %s" % recv_msg)92 recv_list.append(recv_msg)93 recv_list = [x.rstrip('\x00') if x else x for x in recv_list]94 asserts.assert_true(send_list and recv_list and send_list == recv_list,95 "Send and recv information is incorrect")96""" Methods for java.net.DatagramSocket based sockets """97def open_datagram_socket(ad, ip, port):98 """ Open datagram socket99 Args:100 1. ad : android device object101 2. ip : IP addr on the device102 3. port : socket port103 Returns:104 Hash key of the datagram socket105 """106 socket_key = ad.droid.openDatagramSocket(ip, port)107 ad.log.info("Datagram socket: %s" % socket_key)108 asserts.assert_true(socket_key, "Failed to open datagram socket")109 return socket_key110def close_datagram_socket(ad, socket_key):111 """ Close datagram socket112 Args:113 1. socket_key : hash key of datagram socket114 """115 status = ad.droid.closeDatagramSocket(socket_key)116 asserts.assert_true(status, "Failed to close datagram socket")117def send_recv_data_datagram_sockets(client,118 server,119 client_sock,120 server_sock,121 server_ip,122 server_port):123 """ Send data over datagram socket from dut_a to dut_b.124 Verify that dut_b received the data.125 Args:126 1. client : ad object for client device127 2. server : ad object for server device128 3. client_sock : client's socket handle129 4. server_sock : server's socket handle130 5. server_ip : send data to this IP131 6. server_port : send data to this port132 """133 send_list = []134 recv_list = []135 for _ in range(1, PKTS+1):136 msg = MSG + " %s" % _137 send_list.append(msg)138 client.log.info("Sending message: %s" % msg)139 client.droid.sendDataOverDatagramSocket(client_sock,140 msg,141 server_ip,142 server_port)143 recv_msg = server.droid.recvDataOverDatagramSocket(server_sock)144 server.log.info("Received message: %s" % recv_msg)145 recv_list.append(recv_msg)146 recv_list = [x.rstrip('\x00') if x else x for x in recv_list]147 asserts.assert_true(send_list and recv_list and send_list == recv_list,148 "Send and recv information is incorrect")149""" Utils methods for java.net.Socket based sockets """150def _accept_socket(server, server_ip, server_port, server_sock, q):151 sock = server.droid.acceptTcpSocket(server_sock)152 server.log.info("Server socket: %s" % sock)153 q.put(sock)154def _client_socket(client, server_ip, server_port, client_ip, client_port, q):155 time.sleep(0.5)156 sock = client.droid.openTcpSocket(server_ip,157 server_port,158 client_ip,159 client_port)160 client.log.info("Client socket: %s" % sock)161 q.put(sock)162def open_connect_socket(client,163 server,164 client_ip,165 server_ip,166 client_port,167 server_port,168 server_sock):169 """ Open tcp socket and connect to server170 Args:171 1. client : ad object for client device172 2. server : ad object for server device173 3. client_ip : client's socket handle174 4. server_ip : send data to this IP175 5. client_port : port on client socket176 6. server_port : port on server socket177 7. server_sock : server socket178 Returns:179 client and server socket from successful connect180 """181 sq = queue.Queue()182 cq = queue.Queue()183 s = threading.Thread(target = _accept_socket,184 args = (server, server_ip, server_port, server_sock,185 sq))186 c = threading.Thread(target = _client_socket,187 args = (client, server_ip, server_port, client_ip,188 client_port, cq))189 s.start()190 c.start()191 c.join()192 s.join()193 client_sock = cq.get()194 server_sock = sq.get()195 asserts.assert_true(client_sock and server_sock, "Failed to open sockets")196 return client_sock, server_sock197def open_server_socket(server, server_ip, server_port):198 """ Open tcp server socket199 Args:200 1. server : ad object for server device201 2. server_ip : send data to this IP202 3. server_port : send data to this port203 """204 sock = server.droid.openTcpServerSocket(server_ip, server_port)205 server.log.info("Server Socket: %s" % sock)206 asserts.assert_true(sock, "Failed to open server socket")207 return sock208def close_socket(ad, socket):209 """ Close socket210 Args:211 1. ad - android device object212 2. socket - socket key213 """214 status = ad.droid.closeTcpSocket(socket)215 asserts.assert_true(status, "Failed to socket")216def close_server_socket(ad, socket):217 """ Close server socket218 Args:219 1. ad - android device object220 2. socket - server socket key221 """222 status = ad.droid.closeTcpServerSocket(socket)223 asserts.assert_true(status, "Failed to socket")224def send_recv_data_sockets(client, server, client_sock, server_sock):225 """ Send data over TCP socket from client to server.226 Verify that server received the data227 Args:228 1. client : ad object for client device229 2. server : ad object for server device230 3. client_sock : client's socket handle231 4. server_sock : server's socket handle232 """233 send_list = []234 recv_list = []235 for _ in range(1, PKTS+1):236 msg = MSG + " %s" % _237 send_list.append(msg)238 client.log.info("Sending message: %s" % msg)239 client.droid.sendDataOverTcpSocket(client_sock, msg)240 recv_msg = server.droid.recvDataOverTcpSocket(server_sock)241 server.log.info("Received message: %s" % recv_msg)242 recv_list.append(recv_msg)243 recv_list = [x.rstrip('\x00') if x else x for x in recv_list]244 asserts.assert_true(send_list and recv_list and send_list == recv_list,...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!