Best Python code snippet using autotest_python
test_connection.py
Source:test_connection.py
1from collections import namedtuple2import os3import errno4import threading5import time6import uuid7import struct8from nose import SkipTest9from nose.tools import eq_10from nose.tools import raises11import mock12from kazoo.exceptions import ConnectionLoss13from kazoo.protocol.serialization import (14 Connect,15 int_struct,16 write_string,17)18from kazoo.protocol.states import KazooState19from kazoo.protocol.connection import _CONNECTION_DROP20from kazoo.testing import KazooTestCase21from kazoo.tests.util import wait22class Delete(namedtuple('Delete', 'path version')):23 type = 224 def serialize(self):25 b = bytearray()26 b.extend(write_string(self.path))27 b.extend(int_struct.pack(self.version))28 return b29 @classmethod30 def deserialize(self, bytes, offset):31 raise ValueError("oh my")32class TestConnectionHandler(KazooTestCase):33 def test_bad_deserialization(self):34 async_object = self.client.handler.async_result()35 self.client._queue.append((Delete(self.client.chroot, -1), async_object))36 os.write(self.client._connection._write_pipe, b'\0')37 @raises(ValueError)38 def testit():39 async_object.get()40 testit()41 def test_with_bad_sessionid(self):42 ev = threading.Event()43 def expired(state):44 if state == KazooState.CONNECTED:45 ev.set()46 password = os.urandom(16)47 client = self._get_client(client_id=(82838284824, password))48 client.add_listener(expired)49 client.start()50 try:51 ev.wait(15)52 eq_(ev.is_set(), True)53 finally:54 client.stop()55 def test_connection_read_timeout(self):56 client = self.client57 ev = threading.Event()58 path = "/" + uuid.uuid4().hex59 handler = client.handler60 _select = handler.select61 _socket = client._connection._socket62 def delayed_select(*args, **kwargs):63 result = _select(*args, **kwargs)64 if len(args[0]) == 1 and _socket in args[0]:65 # for any socket read, simulate a timeout66 return [], [], []67 return result68 def back(state):69 if state == KazooState.CONNECTED:70 ev.set()71 client.add_listener(back)72 client.create(path, b"1")73 try:74 handler.select = delayed_select75 self.assertRaises(ConnectionLoss, client.get, path)76 finally:77 handler.select = _select78 # the client reconnects automatically79 ev.wait(5)80 eq_(ev.is_set(), True)81 eq_(client.get(path)[0], b"1")82 def test_connection_write_timeout(self):83 client = self.client84 ev = threading.Event()85 path = "/" + uuid.uuid4().hex86 handler = client.handler87 _select = handler.select88 _socket = client._connection._socket89 def delayed_select(*args, **kwargs):90 result = _select(*args, **kwargs)91 if _socket in args[1]:92 # for any socket write, simulate a timeout93 return [], [], []94 return result95 def back(state):96 if state == KazooState.CONNECTED:97 ev.set()98 client.add_listener(back)99 try:100 handler.select = delayed_select101 self.assertRaises(ConnectionLoss, client.create, path)102 finally:103 handler.select = _select104 # the client reconnects automatically105 ev.wait(5)106 eq_(ev.is_set(), True)107 eq_(client.exists(path), None)108 def test_connection_deserialize_fail(self):109 client = self.client110 ev = threading.Event()111 path = "/" + uuid.uuid4().hex112 handler = client.handler113 _select = handler.select114 _socket = client._connection._socket115 def delayed_select(*args, **kwargs):116 result = _select(*args, **kwargs)117 if _socket in args[1]:118 # for any socket write, simulate a timeout119 return [], [], []120 return result121 def back(state):122 if state == KazooState.CONNECTED:123 ev.set()124 client.add_listener(back)125 deserialize_ev = threading.Event()126 def bad_deserialize(bytes, offset):127 deserialize_ev.set()128 raise struct.error()129 # force the connection to die but, on reconnect, cause the130 # server response to be non-deserializable. ensure that the client131 # continues to retry. This partially reproduces a rare bug seen132 # in production.133 with mock.patch.object(Connect, 'deserialize') as mock_deserialize:134 mock_deserialize.side_effect = bad_deserialize135 try:136 handler.select = delayed_select137 self.assertRaises(ConnectionLoss, client.create, path)138 finally:139 handler.select = _select140 # the client reconnects automatically but the first attempt will141 # hit a deserialize failure. wait for that.142 deserialize_ev.wait(5)143 eq_(deserialize_ev.is_set(), True)144 # this time should succeed145 ev.wait(5)146 eq_(ev.is_set(), True)147 eq_(client.exists(path), None)148 def test_connection_close(self):149 self.assertRaises(Exception, self.client.close)150 self.client.stop()151 self.client.close()152 # should be able to restart153 self.client.start()154 def test_connection_pipe(self):155 client = self.client156 read_pipe = client._connection._read_pipe157 write_pipe = client._connection._write_pipe158 assert read_pipe is not None159 assert write_pipe is not None160 # stop client and pipe should not yet be closed161 client.stop()162 assert read_pipe is not None163 assert write_pipe is not None164 os.fstat(read_pipe)165 os.fstat(write_pipe)166 # close client, and pipes should be167 client.close()168 try:169 os.fstat(read_pipe)170 except OSError as e:171 if not e.errno == errno.EBADF:172 raise173 else:174 self.fail("Expected read_pipe to be closed")175 try:176 os.fstat(write_pipe)177 except OSError as e:178 if not e.errno == errno.EBADF:179 raise180 else:181 self.fail("Expected write_pipe to be closed")182 # start client back up. should get a new, valid pipe183 client.start()184 read_pipe = client._connection._read_pipe185 write_pipe = client._connection._write_pipe186 assert read_pipe is not None187 assert write_pipe is not None188 os.fstat(read_pipe)189 os.fstat(write_pipe)190 def test_dirty_pipe(self):191 client = self.client192 read_pipe = client._connection._read_pipe193 write_pipe = client._connection._write_pipe194 # add a stray byte to the pipe and ensure that doesn't195 # blow up client. simulates case where some error leaves196 # a byte in the pipe which doesn't correspond to the197 # request queue.198 os.write(write_pipe, b'\0')199 # eventually this byte should disappear from pipe200 wait(lambda: client.handler.select([read_pipe], [], [], 0)[0] == [])201class TestConnectionDrop(KazooTestCase):202 def test_connection_dropped(self):203 ev = threading.Event()204 def back(state):205 if state == KazooState.CONNECTED:206 ev.set()207 # create a node with a large value and stop the ZK node208 path = "/" + uuid.uuid4().hex209 self.client.create(path)210 self.client.add_listener(back)211 result = self.client.set_async(path, b'a' * 1000 * 1024)212 self.client._call(_CONNECTION_DROP, None)213 self.assertRaises(ConnectionLoss, result.get)214 # we have a working connection to a new node215 ev.wait(30)216 eq_(ev.is_set(), True)217class TestReadOnlyMode(KazooTestCase):218 def setUp(self):219 self.setup_zookeeper(read_only=True)220 ver = self.client.server_version()221 if ver[1] < 4:222 raise SkipTest("Must use zookeeper 3.4 or above")223 def tearDown(self):224 self.client.stop()225 def test_read_only(self):226 from kazoo.exceptions import NotReadOnlyCallError227 from kazoo.protocol.states import KeeperState228 client = self.client229 states = []230 ev = threading.Event()231 @client.add_listener232 def listen(state):233 states.append(state)234 if client.client_state == KeeperState.CONNECTED_RO:235 ev.set()236 try:237 self.cluster[1].stop()238 self.cluster[2].stop()239 ev.wait(6)240 eq_(ev.is_set(), True)241 eq_(client.client_state, KeeperState.CONNECTED_RO)242 # Test read only command243 eq_(client.get_children('/'), [])244 # Test error with write command245 @raises(NotReadOnlyCallError)246 def testit():247 client.create('/fred')248 testit()249 # Wait for a ping250 time.sleep(15)251 finally:252 client.remove_listener(listen)253 self.cluster[1].run()...
test_pipe.py
Source:test_pipe.py
...104 write_pipe.readln()105 assert_that(str(context.exception), contains_string("Attempted to read from '/home/michael/dev/MAMEToolkit/test/emulator/mame/pipes/write-env1.pipe' in 'w' mode"))106 finally:107 close_pipes(write_pipe, lua_read_pipe)108 def test_write_to_read_pipe(self):109 read_pipe, lua_write_pipe = [None, None]110 try:111 read_pipe = Pipe("env1", "read", 'r', "mame/pipes")112 lua_write_pipe = setup_pipe(read_pipe)113 with self.assertRaises(IOError) as context:114 read_pipe.writeln("TEST")115 assert_that(str(context.exception), contains_string("Attempted to write to '/home/michael/dev/MAMEToolkit/test/emulator/mame/pipes/read-env1.pipe' in 'r' mode"))116 finally:117 close_pipes(read_pipe, lua_write_pipe)118 def test_lua_string(self):119 write_pipe, lua_read_pipe, read_pipe, lua_write_pipe = [None, None, None, None]120 try:121 write_pipe, lua_read_pipe, read_pipe, lua_write_pipe = setup_all_pipes()122 assert_that(read_pipe.get_lua_string(args=["test"]), equal_to('readPipe:write(test.."\\n"); readPipe:flush(); '))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!