Best Python code snippet using localstack_python
test.py
Source:test.py
1#!/usr/bin/env python32import pytest3from helpers.cluster import ClickHouseCluster4from kazoo.client import KazooClient, KazooState5from kazoo.security import ACL, make_digest_acl, make_acl6from kazoo.exceptions import AuthFailedError, InvalidACLError, NoAuthError, KazooException7import os8cluster = ClickHouseCluster(__file__)9node = cluster.add_instance('node', main_configs=['configs/keeper_config.xml', 'configs/logs_conf.xml'], stay_alive=True)10def start_zookeeper():11 node.exec_in_container(['bash', '-c', '/opt/zookeeper/bin/zkServer.sh start'])12def stop_zookeeper():13 node.exec_in_container(['bash', '-c', '/opt/zookeeper/bin/zkServer.sh stop'])14def clear_zookeeper():15 node.exec_in_container(['bash', '-c', 'rm -fr /zookeeper/*'])16def restart_and_clear_zookeeper():17 stop_zookeeper()18 clear_zookeeper()19 start_zookeeper()20def clear_clickhouse_data():21 node.exec_in_container(['bash', '-c', 'rm -fr /var/lib/clickhouse/coordination/logs/* /var/lib/clickhouse/coordination/snapshots/*'])22def convert_zookeeper_data():23 cmd = '/usr/bin/clickhouse keeper-converter --zookeeper-logs-dir /zookeeper/version-2/ --zookeeper-snapshots-dir /zookeeper/version-2/ --output-dir /var/lib/clickhouse/coordination/snapshots'24 node.exec_in_container(['bash', '-c', cmd])25def stop_clickhouse():26 node.stop_clickhouse()27def start_clickhouse():28 node.start_clickhouse()29def copy_zookeeper_data(make_zk_snapshots):30 stop_zookeeper()31 if make_zk_snapshots: # force zookeeper to create snapshot32 start_zookeeper()33 stop_zookeeper()34 stop_clickhouse()35 clear_clickhouse_data()36 convert_zookeeper_data()37 start_zookeeper()38 start_clickhouse()39@pytest.fixture(scope="module")40def started_cluster():41 try:42 cluster.start()43 yield cluster44 finally:45 cluster.shutdown()46def get_fake_zk(timeout=30.0):47 _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip('node') + ":9181", timeout=timeout)48 _fake_zk_instance.start()49 return _fake_zk_instance50def get_genuine_zk(timeout=30.0):51 _genuine_zk_instance = KazooClient(hosts=cluster.get_instance_ip('node') + ":2181", timeout=timeout)52 _genuine_zk_instance.start()53 return _genuine_zk_instance54def compare_stats(stat1, stat2, path):55 assert stat1.czxid == stat2.czxid, "path " + path + " cxzids not equal for stats: " + str(stat1.czxid) + " != " + str(stat2.zxid)56 assert stat1.mzxid == stat2.mzxid, "path " + path + " mxzids not equal for stats: " + str(stat1.mzxid) + " != " + str(stat2.mzxid)57 assert stat1.version == stat2.version, "path " + path + " versions not equal for stats: " + str(stat1.version) + " != " + str(stat2.version)58 assert stat1.cversion == stat2.cversion, "path " + path + " cversions not equal for stats: " + str(stat1.cversion) + " != " + str(stat2.cversion)59 assert stat1.aversion == stat2.aversion, "path " + path + " aversions not equal for stats: " + str(stat1.aversion) + " != " + str(stat2.aversion)60 assert stat1.ephemeralOwner == stat2.ephemeralOwner,"path " + path + " ephemeralOwners not equal for stats: " + str(stat1.ephemeralOwner) + " != " + str(stat2.ephemeralOwner)61 assert stat1.dataLength == stat2.dataLength , "path " + path + " ephemeralOwners not equal for stats: " + str(stat1.dataLength) + " != " + str(stat2.dataLength)62 assert stat1.numChildren == stat2.numChildren, "path " + path + " numChildren not equal for stats: " + str(stat1.numChildren) + " != " + str(stat2.numChildren)63 assert stat1.pzxid == stat2.pzxid, "path " + path + " pzxid not equal for stats: " + str(stat1.pzxid) + " != " + str(stat2.pzxid)64def compare_states(zk1, zk2, path="/"):65 data1, stat1 = zk1.get(path)66 data2, stat2 = zk2.get(path)67 print("Left Stat", stat1)68 print("Right Stat", stat2)69 assert data1 == data2, "Data not equal on path " + str(path)70 # both paths have strange stats71 if path not in ("/", "/zookeeper"):72 compare_stats(stat1, stat2, path)73 first_children = list(sorted(zk1.get_children(path)))74 second_children = list(sorted(zk2.get_children(path)))75 print("Got children left", first_children)76 print("Got children rigth", second_children)77 assert first_children == second_children, "Childrens are not equal on path " + path78 for children in first_children:79 print("Checking child", os.path.join(path, children))80 compare_states(zk1, zk2, os.path.join(path, children))81@pytest.mark.parametrize(82 ('create_snapshots'),83 [84 True, False85 ]86)87def test_smoke(started_cluster, create_snapshots):88 restart_and_clear_zookeeper()89 genuine_connection = get_genuine_zk()90 genuine_connection.create("/test", b"data")91 assert genuine_connection.get("/test")[0] == b"data"92 copy_zookeeper_data(create_snapshots)93 genuine_connection = get_genuine_zk()94 fake_connection = get_fake_zk()95 compare_states(genuine_connection, fake_connection)96def get_bytes(s):97 return s.encode()98@pytest.mark.parametrize(99 ('create_snapshots'),100 [101 True, False102 ]103)104def test_simple_crud_requests(started_cluster, create_snapshots):105 restart_and_clear_zookeeper()106 genuine_connection = get_genuine_zk()107 for i in range(100):108 genuine_connection.create("/test_create" + str(i), get_bytes("data" + str(i)))109 # some set queries110 for i in range(10):111 for j in range(i + 1):112 genuine_connection.set("/test_create" + str(i), get_bytes("value" + str(j)))113 for i in range(10, 20):114 genuine_connection.delete("/test_create" + str(i))115 path = "/test_create_deep"116 for i in range(10):117 genuine_connection.create(path, get_bytes("data" + str(i)))118 path = os.path.join(path, str(i))119 genuine_connection.create("/test_sequential", b"")120 for i in range(10):121 genuine_connection.create("/test_sequential/" + "a" * i + "-", get_bytes("dataX" + str(i)), sequence=True)122 genuine_connection.create("/test_ephemeral", b"")123 for i in range(10):124 genuine_connection.create("/test_ephemeral/" + str(i), get_bytes("dataX" + str(i)), ephemeral=True)125 copy_zookeeper_data(create_snapshots)126 genuine_connection = get_genuine_zk()127 fake_connection = get_fake_zk()128 compare_states(genuine_connection, fake_connection)129 # especially ensure that counters are the same130 genuine_connection.create("/test_sequential/" + "a" * 10 + "-", get_bytes("dataX" + str(i)), sequence=True)131 fake_connection.create("/test_sequential/" + "a" * 10 + "-", get_bytes("dataX" + str(i)), sequence=True)132 first_children = list(sorted(genuine_connection.get_children("/test_sequential")))133 second_children = list(sorted(fake_connection.get_children("/test_sequential")))134 assert first_children == second_children, "Childrens are not equal on path " + path135@pytest.mark.parametrize(136 ('create_snapshots'),137 [138 True, False139 ]140)141def test_multi_and_failed_requests(started_cluster, create_snapshots):142 restart_and_clear_zookeeper()143 genuine_connection = get_genuine_zk()144 genuine_connection.create('/test_multitransactions')145 for i in range(10):146 t = genuine_connection.transaction()147 t.create('/test_multitransactions/freddy' + str(i), get_bytes('data' + str(i)))148 t.create('/test_multitransactions/fred' + str(i), get_bytes('value' + str(i)), ephemeral=True)149 t.create('/test_multitransactions/smith' + str(i), get_bytes('entity' + str(i)), sequence=True)150 t.set_data('/test_multitransactions', get_bytes("somedata" + str(i)))151 t.commit()152 with pytest.raises(Exception):153 genuine_connection.set('/test_multitransactions/freddy0', get_bytes('mustfail' + str(i)), version=1)154 t = genuine_connection.transaction()155 t.create('/test_bad_transaction', get_bytes('data' + str(1)))156 t.check('/test_multitransactions', version=32)157 t.create('/test_bad_transaction1', get_bytes('data' + str(2)))158 # should fail159 t.commit()160 assert genuine_connection.exists('/test_bad_transaction') is None161 assert genuine_connection.exists('/test_bad_transaction1') is None162 t = genuine_connection.transaction()163 t.create('/test_bad_transaction2', get_bytes('data' + str(1)))164 t.delete('/test_multitransactions/freddy0', version=5)165 # should fail166 t.commit()167 assert genuine_connection.exists('/test_bad_transaction2') is None168 assert genuine_connection.exists('/test_multitransactions/freddy0') is not None169 copy_zookeeper_data(create_snapshots)170 genuine_connection = get_genuine_zk()171 fake_connection = get_fake_zk()172 compare_states(genuine_connection, fake_connection)173@pytest.mark.parametrize(174 ('create_snapshots'),175 [176 True, False177 ]178)179def test_acls(started_cluster, create_snapshots):180 restart_and_clear_zookeeper()181 genuine_connection = get_genuine_zk()182 genuine_connection.add_auth('digest', 'user1:password1')183 genuine_connection.add_auth('digest', 'user2:password2')184 genuine_connection.add_auth('digest', 'user3:password3')185 genuine_connection.create("/test_multi_all_acl", b"data", acl=[make_acl("auth", "", all=True)])186 other_connection = get_genuine_zk()187 other_connection.add_auth('digest', 'user1:password1')188 other_connection.set("/test_multi_all_acl", b"X")189 assert other_connection.get("/test_multi_all_acl")[0] == b"X"190 yet_other_auth_connection = get_genuine_zk()191 yet_other_auth_connection.add_auth('digest', 'user2:password2')192 yet_other_auth_connection.set("/test_multi_all_acl", b"Y")193 genuine_connection.add_auth('digest', 'user3:password3')194 # just to check that we are able to deserialize it195 genuine_connection.set_acls("/test_multi_all_acl", acls=[make_acl("auth", "", read=True, write=False, create=True, delete=True, admin=True)])196 no_auth_connection = get_genuine_zk()197 with pytest.raises(Exception):198 no_auth_connection.set("/test_multi_all_acl", b"Z")199 copy_zookeeper_data(create_snapshots)200 genuine_connection = get_genuine_zk()201 genuine_connection.add_auth('digest', 'user1:password1')202 genuine_connection.add_auth('digest', 'user2:password2')203 genuine_connection.add_auth('digest', 'user3:password3')204 fake_connection = get_fake_zk()205 fake_connection.add_auth('digest', 'user1:password1')206 fake_connection.add_auth('digest', 'user2:password2')207 fake_connection.add_auth('digest', 'user3:password3')208 compare_states(genuine_connection, fake_connection)209 for connection in [genuine_connection, fake_connection]:210 acls, stat = connection.get_acls("/test_multi_all_acl")211 assert stat.aversion == 1212 assert len(acls) == 3213 for acl in acls:214 assert acl.acl_list == ['READ', 'CREATE', 'DELETE', 'ADMIN']215 assert acl.id.scheme == 'digest'216 assert acl.perms == 29...
test_nsnap02.py
Source:test_nsnap02.py
...54 else:55 self.assertRaisesWithMessage(wiredtiger.WiredTigerError, lambda:56 self.session.begin_transaction("snapshot=%d" % (snap_name)),57 "/Invalid argument/")58 def create_snapshots(self):59 # Populate a table60 end = start = 061 SimpleDataSet(self, self.uri, 0, key_format='i').populate()62 # Create a set of snapshots63 # Each snapshot has a bunch of new data64 # Each snapshot removes a (smaller) bunch of old data65 snapshots = []66 c = self.session.open_cursor(self.uri)67 for n in xrange(self.nsnapshots):68 self.session.snapshot("name=%d" % (n))69 snapshots.append((n, end - start, 0))70 for i in xrange(2 * self.nrows_per_snap):71 c[end + i] = "some value"72 end += 2 * self.nrows_per_snap73 for i in xrange(self.nrows_per_snap):74 del c[start + i]75 start += self.nrows_per_snap76 return snapshots77 def test_drop_all_snapshots(self):78 snapshots = self.create_snapshots()79 self.check_named_snapshots(snapshots)80 self.session.snapshot("drop=(all)")81 new_snapshots = []82 for snap_name, expected, dropped in snapshots:83 new_snapshots.append((snap_name, expected, 1))84 # Make sure all the snapshots are gone.85 self.check_named_snapshots(new_snapshots)86 def test_drop_first_snapshot(self):87 snapshots = self.create_snapshots()88 c = self.session.open_cursor(self.uri)89 for snap_name, expected, dropped in snapshots:90 self.check_named_snapshot(c, snap_name, expected)91 self.session.snapshot("drop=(names=[0])")92 # Construct a snapshot array matching the expected state.93 new_snapshots = []94 for snap_name, expected, dropped in snapshots:95 if snap_name == 0:96 new_snapshots.append((snap_name, expected, 1))97 else:98 new_snapshots.append((snap_name, expected, 0))99 # Make sure all the snapshots are gone.100 self.check_named_snapshots(new_snapshots)101 def test_drop_to_first_snapshot(self):102 snapshots = self.create_snapshots()103 c = self.session.open_cursor(self.uri)104 for snap_name, expected, dropped in snapshots:105 self.check_named_snapshot(c, snap_name, expected)106 self.session.snapshot("drop=(to=0)")107 # Construct a snapshot array matching the expected state.108 new_snapshots = []109 for snap_name, expected, dropped in snapshots:110 if snap_name == 0:111 new_snapshots.append((snap_name, expected, 1))112 else:113 new_snapshots.append((snap_name, expected, 0))114 # Make sure all the snapshots are gone.115 self.check_named_snapshots(new_snapshots)116 def test_drop_before_first_snapshot(self):117 snapshots = self.create_snapshots()118 c = self.session.open_cursor(self.uri)119 for snap_name, expected, dropped in snapshots:120 self.check_named_snapshot(c, snap_name, expected)121 self.session.snapshot("drop=(before=0)")122 # Make sure no snapshots are gone123 self.check_named_snapshots(snapshots)124 def test_drop_to_third_snapshot(self):125 snapshots = self.create_snapshots()126 c = self.session.open_cursor(self.uri)127 for snap_name, expected, dropped in snapshots:128 self.check_named_snapshot(c, snap_name, expected)129 self.session.snapshot("drop=(to=3)")130 # Construct a snapshot array matching the expected state.131 new_snapshots = []132 for snap_name, expected, dropped in snapshots:133 if snap_name <= 3:134 new_snapshots.append((snap_name, expected, 1))135 else:136 new_snapshots.append((snap_name, expected, 0))137 # Make sure all the snapshots are gone.138 self.check_named_snapshots(new_snapshots)139 def test_drop_before_third_snapshot(self):140 snapshots = self.create_snapshots()141 c = self.session.open_cursor(self.uri)142 for snap_name, expected, dropped in snapshots:143 self.check_named_snapshot(c, snap_name, expected)144 self.session.snapshot("drop=(before=3)")145 # Construct a snapshot array matching the expected state.146 new_snapshots = []147 for snap_name, expected, dropped in snapshots:148 if snap_name < 3:149 new_snapshots.append((snap_name, expected, 1))150 else:151 new_snapshots.append((snap_name, expected, 0))152 # Make sure all the snapshots are gone.153 self.check_named_snapshots(new_snapshots)154 def test_drop_to_last_snapshot(self):155 snapshots = self.create_snapshots()156 c = self.session.open_cursor(self.uri)157 for snap_name, expected, dropped in snapshots:158 self.check_named_snapshot(c, snap_name, expected)159 self.session.snapshot("drop=(to=%d)" % (self.nsnapshots - 1))160 # Construct a snapshot array matching the expected state.161 new_snapshots = []162 for snap_name, expected, dropped in snapshots:163 new_snapshots.append((snap_name, expected, 1))164 # Make sure all the snapshots are gone.165 self.check_named_snapshots(new_snapshots)166 def test_drop_before_last_snapshot(self):167 snapshots = self.create_snapshots()168 c = self.session.open_cursor(self.uri)169 for snap_name, expected, dropped in snapshots:170 self.check_named_snapshot(c, snap_name, expected)171 self.session.snapshot("drop=(before=%d)" % (self.nsnapshots - 1))172 # Construct a snapshot array matching the expected state.173 new_snapshots = []174 for snap_name, expected, dropped in snapshots:175 if snap_name < self.nsnapshots - 1:176 new_snapshots.append((snap_name, expected, 1))177 else:178 new_snapshots.append((snap_name, expected, 0))179 # Make sure all the snapshots are gone.180 self.check_named_snapshots(new_snapshots)181 def test_drop_specific_snapshots1(self):182 snapshots = self.create_snapshots()183 c = self.session.open_cursor(self.uri)184 for snap_name, expected, dropped in snapshots:185 self.check_named_snapshot(c, snap_name, expected)186 self.session.snapshot(187 "drop=(names=[%d,%d,%d])" % (0, 3, self.nsnapshots - 1))188 # Construct a snapshot array matching the expected state.189 new_snapshots = []190 for snap_name, expected, dropped in snapshots:191 if snap_name == 0 or snap_name == 3 or \192 snap_name == self.nsnapshots - 1:193 new_snapshots.append((snap_name, expected, 1))194 else:195 new_snapshots.append((snap_name, expected, 0))196 # Make sure all the snapshots are gone....
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!