Best Python code snippet using localstack_python
test_blob.py
Source:test_blob.py
1import os.path2import io3import pytest4from unittest import mock5from botocore.stub import ANY6from botocore.response import StreamingBody7from blobby import blob8def get_object_response(data):9 body = StreamingBody(io.BytesIO(data), len(data))10 return {11 'ResponseMetadata': {12 'HTTPStatusCode': 200,13 },14 'Body': body,15 }16class TestBlobReader:17 def test_read(self, reader, stubber):18 stubber.add_response('head_object',19 {'ContentLength': 4},20 {'Bucket': 'mock-bucket', 'Key': 'test'})21 stubber.add_response('get_object',22 get_object_response(b'test'),23 {'Bucket': 'mock-bucket', 'Key': 'test', 'Range': 'bytes=0-'})24 assert reader.read() == b'test'25 def test_read_error(self, reader, stubber):26 # NOTE: we believe that such an error would always raise an27 # exception in boto3. But in case it doesn't we check our code28 # for handling that here.29 stubber.add_response('head_object',30 {'ContentLength': 4},31 {'Bucket': 'mock-bucket', 'Key': 'test'})32 stubber.add_response('get_object',33 {'ResponseMetadata': {'HTTPStatusCode': 404}},34 {'Bucket': 'mock-bucket', 'Key': 'test', 'Range': 'bytes=0-'})35 with pytest.raises(blob.BlobError):36 reader.read()37 def test_read_wrongtype(self, reader, stubber):38 # NOTE: similarly to `test_read_error`, we believe that we will39 # always get a `StreamingBody` in a successful response.40 stubber.add_response('head_object',41 {'ContentLength': 4},42 {'Bucket': 'mock-bucket', 'Key': 'test'})43 stubber.add_response('get_object',44 {45 'ResponseMetadata': {'HTTPStatusCode': 200},46 'Body': b'wrong'47 },48 {'Bucket': 'mock-bucket', 'Key': 'test', 'Range': 'bytes=0-'})49 with pytest.raises(TypeError):50 reader.read()51 def test_read_from(self, reader, stubber):52 stubber.add_response('head_object',53 {'ContentLength': 4},54 {'Bucket': 'mock-bucket', 'Key': 'test'})55 stubber.add_response('get_object',56 get_object_response(b'st'),57 {'Bucket': 'mock-bucket', 'Key': 'test', 'Range': 'bytes=2-'})58 assert reader.seek(2) == 259 assert reader.read() == b'st'60 def test_read_middle(self, reader, stubber):61 stubber.add_response('head_object',62 {'ContentLength': 4},63 {'Bucket': 'mock-bucket', 'Key': 'test'})64 stubber.add_response('get_object',65 get_object_response(b'es'),66 {'Bucket': 'mock-bucket', 'Key': 'test', 'Range': 'bytes=1-2'})67 reader.seek(1)68 assert reader.read(2) == b'es'69 def test_read_to(self, reader, stubber):70 stubber.add_response('head_object',71 {'ContentLength': 4},72 {'Bucket': 'mock-bucket', 'Key': 'test'})73 stubber.add_response('get_object',74 get_object_response(b'te'),75 {'Bucket': 'mock-bucket', 'Key': 'test', 'Range': 'bytes=0-1'})76 assert reader.read(2) == b'te'77 def test_read_end(self, reader, stubber):78 stubber.add_response('head_object',79 {'ContentLength': 10},80 {'Bucket': 'mock-bucket', 'Key': 'test'})81 reader.seek(20)82 assert reader.read() == b''83 def test_seek_set(self, reader, stubber):84 assert reader.seek(10) == 1085 assert reader.seek(5) == 586 assert reader.seek(5, io.SEEK_SET) == 587 with pytest.raises(ValueError):88 reader.seek(-5, io.SEEK_SET)89 def test_seek_cur(self, reader, stubber):90 assert reader.seek(10, io.SEEK_CUR) == 1091 assert reader.seek(5, io.SEEK_CUR) == 1592 assert reader.seek(-5, io.SEEK_CUR) == 1093 with pytest.raises(ValueError):94 reader.seek(-15, io.SEEK_CUR)95 def test_seek_end(self, reader, stubber):96 stubber.add_response('head_object',97 {'ContentLength': 100},98 {'Bucket': 'mock-bucket', 'Key': 'test'})99 assert reader.seek(-10, io.SEEK_END) == 90100 assert reader.seek(-5, io.SEEK_END) == 95101 assert reader.seek(5, io.SEEK_END) == 105102 with pytest.raises(ValueError):103 reader.seek(-200, io.SEEK_END)104 def test_tell(self, reader, stubber):105 assert reader.tell() == 0106 reader.seek(10)107 assert reader.tell() == 10108 reader.seek(100)109 assert reader.tell() == 100110 def test_seek_invalid(self, reader, stubber):111 with pytest.raises(ValueError):112 reader.seek(0, 4)113 def test_read_into(self, reader, stubber):114 stubber.add_response('head_object',115 {'ContentLength': 8},116 {'Bucket': 'mock-bucket', 'Key': 'test'})117 stubber.add_response('get_object',118 get_object_response(b'mockeries'),119 {'Bucket': 'mock-bucket', 'Key': 'test', 'Range': 'bytes=0-7'})120 array = bytearray(8)121 assert reader.readinto(array) == 8122 assert array == b'mockeries'123 def test_read_into_middle(self, reader, stubber):124 stubber.add_response('head_object',125 {'ContentLength': 8},126 {'Bucket': 'mock-bucket', 'Key': 'test'})127 stubber.add_response('get_object',128 get_object_response(b'eries'),129 {'Bucket': 'mock-bucket', 'Key': 'test', 'Range': 'bytes=4-7'})130 array = bytearray(4)131 reader.seek(4)132 assert reader.readinto(array) == 4133 assert array == b'eries'134 def test_fifo(self, reader, stubber):135 stubber.add_response('head_object',136 {'ContentLength': 4},137 {'Bucket': 'mock-bucket', 'Key': 'test'})138 stubber.add_response('get_object',139 get_object_response(b'test'),140 {'Bucket': 'mock-bucket', 'Key': 'test', 'Range': 'bytes=0-'})141 with reader.as_fifo() as fd:142 fd.fileno()143 assert not fd.seekable()144 assert fd.read() == b'test'145 assert not os.path.exists(fd.name)146 def test_tempfile(self, reader, stubber):147 stubber.add_response('head_object',148 {'ContentLength': 9},149 {'Bucket': 'mock-bucket', 'Key': 'test'})150 stubber.add_response('get_object',151 get_object_response(b'test'),152 {'Bucket': 'mock-bucket', 'Key': 'test', 'Range': 'bytes=0-3'})153 stubber.add_response('head_object',154 {'ContentLength': 9},155 {'Bucket': 'mock-bucket', 'Key': 'test'})156 stubber.add_response('get_object',157 get_object_response(b' tes'),158 {'Bucket': 'mock-bucket', 'Key': 'test', 'Range': 'bytes=4-7'})159 stubber.add_response('head_object',160 {'ContentLength': 9},161 {'Bucket': 'mock-bucket', 'Key': 'test'})162 stubber.add_response('get_object',163 get_object_response(b't'),164 {'Bucket': 'mock-bucket',165 'Key': 'test',166 'Range': 'bytes=8-11'})167 stubber.add_response('head_object',168 {'ContentLength': 9},169 {'Bucket': 'mock-bucket', 'Key': 'test'})170 with reader.as_tempfile(chunk_size=4) as fd:171 fd.fileno()172 assert fd.seekable()173 assert fd.read() == b'test test'174 with pytest.raises(ValueError):175 fd.read()176 assert not os.path.exists(fd.name)177 def test_readable(self, reader, stubber):178 assert reader.readable()179 def test_seekable(self, reader, stubber):180 assert reader.seekable()181 def test_len(self, reader, stubber):182 stubber.add_response('head_object',183 {'ContentLength': 98276},184 {'Bucket': 'mock-bucket', 'Key': 'test'})185 assert len(reader) == 98276186 def test_repr(self, reader):187 assert repr(reader) == (188 "<BlobReader wrapping s3.Object(bucket_name='mock-bucket', "189 "key='test')>"190 )191class TestBlobWriter:192 def test_write(self, bucket, stubber):193 # Replace the original upload_fileobj method on bucket to also194 # capture whatever was uploaded. Because the test itself has the Bucket195 # instance this is easier than using unittest.mock.patch.196 orig_upload_fileobj = bucket.upload_fileobj197 def upload_fileobj(inner_self, fileobj, key):198 inner_self._captured_data = fileobj.read()199 inner_self._captured_key = key200 fileobj.seek(0)201 return orig_upload_fileobj(fileobj, key)202 bucket.upload_fileobj = upload_fileobj.__get__(bucket, type(bucket))203 writer = blob.BlobWriter(bucket)204 data = b'test1test2'205 writer.write(data[:5])206 writer.write(data[5:])207 # It's not possible to check that the data in the request is correct208 # because it is wrapped in a ReadFileChunk object which does not over-209 # -load __eq__, so we specify ANY here and check the data using the210 # above monkey-patch.211 stubber.add_response(212 'put_object',213 {},214 {215 'Bucket': 'mock-bucket',216 'Key': writer.key(),217 'Body': ANY218 }219 )220 writer.flush()221 assert bucket._captured_data == data222 assert writer.buffer.closed223 def test_close_closes_and_flushes(self, writer, stubber):224 writer.write(b'test')225 # We don't explicitly check for flush, but we check that the put226 # request is made in cleanup.227 stubber.add_response(228 'put_object',229 {},230 {231 'Bucket': 'mock-bucket',232 'Key': writer.key(),233 'Body': ANY234 }235 )236 writer.close()237 assert writer.closed238 def test_closed_errors(self, writer, stubber):239 stubber.add_response(240 'put_object',241 {},242 {243 'Bucket': 'mock-bucket',244 'Key': writer.key(),245 'Body': ANY246 }247 )248 writer.close()249 with pytest.raises(ValueError):250 writer.writable()251 with pytest.raises(ValueError):252 writer.write(b'')253 def test_callback(self, writer, stubber):254 writer.write(b'test')255 expected_key = writer.key()256 stubber.add_response(257 'put_object',258 {},259 {260 'Bucket': 'mock-bucket',261 'Key': expected_key,262 'Body': ANY263 }264 )265 @mock.Mock266 def callback(key):267 pass268 writer.register_callback(callback)269 writer.flush()270 callback.assert_called_with(expected_key)271 def test_readable(self, writer):272 assert not writer.readable()273 def test_writable(self, writer):274 assert writer.writable()275 def test_seekable(self, writer):276 assert not writer.seekable()277 def test_cancel(self, writer):278 writer.cancel()279 with pytest.raises(ValueError):280 writer.flush()281 # Also explicitly check that a `close()` does not throw an error282 # or make any requests283 writer.close()284 def test_repr(self, writer):285 assert repr(writer) == (286 "<BlobWriter targeting s3.Bucket(name='mock-bucket')>"...
test_squrl.py
Source:test_squrl.py
1from datetime import datetime2from botocore.exceptions import ClientError3from botocore.stub import Stubber, ANY4from boto3 import client5import pytest6from squrl import Squrl7@pytest.fixture(scope="function")8def stubber(request):9 stub = Stubber(client("s3"))10 request.addfinalizer(stub.assert_no_pending_responses)11 return stub12@pytest.fixture(scope="session")13def parameters():14 bucket = "test-bucket"15 url = "test-url"16 key = Squrl.get_key(url)17 return {18 "bucket": bucket,19 "url": url,20 "key": key,21 "head_object": {22 "Bucket": bucket,23 "Key": key24 },25 "put_object": {26 "Bucket": bucket,27 "Key": key,28 "WebsiteRedirectLocation": url,29 "Expires": ANY,30 "ContentType": ANY31 }32 }33def test_get_key(parameters):34 key = Squrl.get_key("test-url")35 assert len(key) == Squrl.key_length + 236 assert key.startswith("u/")37def test_get_expiration(parameters):38 expiration = Squrl.get_expiration()39 assert expiration > datetime.now()40def test_key_exists(stubber, parameters):41 bucket = parameters["bucket"]42 key = parameters["key"]43 stubber.add_response(44 "head_object", {}, expected_params=parameters["head_object"]45 )46 stubber.activate()47 assert Squrl(bucket, client=stubber.client).key_exists(key)48def test_key_does_not_exist(stubber, parameters):49 bucket = parameters["bucket"]50 key = parameters["key"]51 stubber.add_client_error(52 "head_object",53 expected_params=parameters["head_object"],54 service_error_code="404"55 )56 stubber.activate()57 assert not Squrl(bucket, client=stubber.client).key_exists(key)58def test_key_error(stubber, parameters):59 bucket = parameters["bucket"]60 key = parameters["key"]61 stubber.add_client_error(62 "head_object",63 expected_params=parameters["head_object"],64 service_error_code="500"65 )66 stubber.activate()67 with pytest.raises(ClientError):68 Squrl(bucket, client=stubber.client).key_exists(key)69def test_get_method_key_exists(stubber, parameters):70 bucket = parameters["bucket"]71 url = parameters["url"]72 stubber.add_response(73 "head_object", {}, expected_params=parameters["head_object"]74 )75 stubber.activate()76 assert Squrl(bucket, client=stubber.client).get(url)77def test_get_method_key_does_not_exist(stubber, parameters):78 bucket = parameters["bucket"]79 url = parameters["url"]80 stubber.add_client_error(81 "head_object",82 expected_params=parameters["head_object"],83 service_error_code="404"84 )85 stubber.activate()86 assert not Squrl(bucket, client=stubber.client).get(url)87def test_create_method(stubber, parameters):88 bucket = parameters["bucket"]89 url = parameters["url"]90 stubber.add_response(91 "put_object", {}, expected_params=parameters["put_object"]92 )93 stubber.activate()...
Factories.py
Source:Factories.py
...33 }34 @staticmethod35 def create_head(name, params, location):36 head_object = HeadFactory.object_dict.get(name)37 return head_object(params, location)38 @staticmethod39 def add_heads(heads, name, params, game_state):40 head_object = HeadFactory.object_dict.get(name)41 if name == 'food':42 for x in range(game_state.data.food.width):43 for y in range(game_state.data.food.height):44 if game_state.hasFood(x, y) is True:45 heads[(x, y)] = head_object(params, (x, y))46 elif name == 'capsule':47 for capsule in game_state.getCapsules():48 heads[capsule] = head_object(params, capsule)49 elif name == 'ghost' or name == 'scared_ghost':50 for idx in range(1, game_state.getNumAgents()):51 heads[name + '_' + str(idx)] = head_object(params, idx)52 elif name == 'exploration':53 head_object = HeadFactory.object_dict.get("diversification")54 heads['diversification'] = head_object(params)55class AggregatorFactory(object):56 object_dict = {57 "linear": LinearAggregator,58 "posnormneg": PosNormNegAggregator59 }60 @staticmethod61 def create_aggregator(name):62 aggregator_object = AggregatorFactory.object_dict.get(name)63 if aggregator_object is not None:64 return aggregator_object()65 else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!