How to use put_events method in localstack

Best Python code snippet using localstack_python

test_eventbridge.py

Source:test_eventbridge.py Github

copy

Full Screen

...56 client, stubber = self.create_stubbed_eventbridge_client(57 with_default_responses=True,58 )59 with stubber:60 client.put_events(**self._default_put_events_args())61 assert (62 stubber.requests[0].url63 == "https://events.us-east-1.amazonaws.com/"64 )65 assert b"EndpointId" not in stubber.requests[0].body66 def test_put_event_default_endpoint_explicit_configs(self):67 client, stubber = self.create_stubbed_eventbridge_client(68 with_default_responses=True,69 config=Config(70 use_dualstack_endpoint=False,71 use_fips_endpoint=False,72 ),73 )74 with stubber:75 client.put_events(**self._default_put_events_args())76 assert (77 stubber.requests[0].url78 == "https://events.us-east-1.amazonaws.com/"79 )80 assert b"EndpointId" not in stubber.requests[0].body81 @requires_crt()82 def test_put_event_endpoint_id(self):83 client, stubber = self.create_stubbed_eventbridge_client(84 with_default_responses=True,85 )86 default_args = self._default_put_events_args()87 endpoint_id = "abc123.456def"88 with stubber:89 client.put_events(EndpointId=endpoint_id, **default_args)90 self._assert_params_in_body(91 stubber.requests[0],92 [93 ("EndpointId", endpoint_id),94 ],95 )96 self._assert_multi_region_endpoint(stubber.requests[0], endpoint_id)97 self._assert_sigv4a_headers(stubber.requests[0])98 @requires_crt()99 def test_put_event_endpoint_id_explicit_config(self):100 client, stubber = self.create_stubbed_eventbridge_client(101 with_default_responses=True,102 config=Config(103 use_dualstack_endpoint=False,104 use_fips_endpoint=False,105 ),106 )107 default_args = self._default_put_events_args()108 endpoint_id = "abc123.456def"109 with stubber:110 client.put_events(EndpointId=endpoint_id, **default_args)111 self._assert_params_in_body(112 stubber.requests[0],113 [114 ("EndpointId", endpoint_id),115 ],116 )117 self._assert_multi_region_endpoint(stubber.requests[0], endpoint_id)118 self._assert_sigv4a_headers(stubber.requests[0])119 @requires_crt()120 def test_put_event_bad_endpoint_id(self):121 client, stubber = self.create_stubbed_eventbridge_client(122 with_default_responses=True,123 )124 default_args = self._default_put_events_args()125 endpoint_id = "badactor.com?foo=bar"126 with pytest.raises(InvalidEndpointConfigurationError) as e:127 client.put_events(EndpointId=endpoint_id, **default_args)128 assert "EndpointId is not a valid hostname component" in str(e.value)129 @requires_crt()130 def test_put_event_bad_endpoint_id_explicit_config(self):131 client, stubber = self.create_stubbed_eventbridge_client(132 with_default_responses=True,133 config=Config(134 use_dualstack_endpoint=False,135 use_fips_endpoint=False,136 ),137 )138 default_args = self._default_put_events_args()139 endpoint_id = "badactor.com?foo=bar"140 with pytest.raises(InvalidEndpointConfigurationError) as e:141 client.put_events(EndpointId=endpoint_id, **default_args)142 assert "EndpointId is not a valid hostname component" in str(e.value)143 @requires_crt()144 def test_put_event_empty_endpoint_id(self):145 client, stubber = self.create_stubbed_eventbridge_client(146 with_default_responses=True,147 )148 default_args = self._default_put_events_args()149 endpoint_id = ""150 with pytest.raises(InvalidEndpointConfigurationError) as e:151 client.put_events(EndpointId=endpoint_id, **default_args)152 assert "EndpointId must not be a zero length string" in str(e.value)153 @requires_crt()154 def test_put_event_empty_endpoint_id_explicit_config(self):155 client, stubber = self.create_stubbed_eventbridge_client(156 with_default_responses=True,157 config=Config(158 use_dualstack_endpoint=False,159 use_fips_endpoint=False,160 ),161 )162 default_args = self._default_put_events_args()163 endpoint_id = ""164 with pytest.raises(InvalidEndpointConfigurationError) as e:165 client.put_events(EndpointId=endpoint_id, **default_args)166 assert "EndpointId must not be a zero length string" in str(e.value)167 def test_put_event_default_dualstack_endpoint(self):168 config = Config(use_dualstack_endpoint=True, use_fips_endpoint=False)169 client, stubber = self.create_stubbed_eventbridge_client(170 with_default_responses=True, config=config171 )172 default_args = self._default_put_events_args()173 with stubber:174 client.put_events(**default_args)175 assert stubber.requests[0].url == "https://events.us-east-1.api.aws/"176 @requires_crt()177 def test_put_events_endpoint_id_dualstack(self):178 config = Config(use_dualstack_endpoint=True, use_fips_endpoint=False)179 client, stubber = self.create_stubbed_eventbridge_client(180 with_default_responses=True, config=config181 )182 default_args = self._default_put_events_args()183 endpoint_id = "abc123.456def"184 with stubber:185 client.put_events(EndpointId=endpoint_id, **default_args)186 self._assert_params_in_body(187 stubber.requests[0],188 [189 ("EndpointId", endpoint_id),190 ],191 )192 self._assert_multi_region_endpoint(193 stubber.requests[0], endpoint_id, suffix="api.aws"194 )195 self._assert_sigv4a_headers(stubber.requests[0])196 def test_put_events_default_fips_endpoint(self):197 config = Config(use_dualstack_endpoint=False, use_fips_endpoint=True)198 client, stubber = self.create_stubbed_eventbridge_client(199 with_default_responses=True, config=config200 )201 default_args = self._default_put_events_args()202 with stubber:203 client.put_events(**default_args)204 assert (205 stubber.requests[0].url206 == "https://events-fips.us-east-1.amazonaws.com/"207 )208 @requires_crt()209 def test_put_events_endpoint_id_fips(self):210 config = Config(use_dualstack_endpoint=False, use_fips_endpoint=True)211 client, stubber = self.create_stubbed_eventbridge_client(212 with_default_responses=True, config=config213 )214 default_args = self._default_put_events_args()215 endpoint_id = "abc123.456def"216 with pytest.raises(InvalidEndpointConfigurationError) as e:217 client.put_events(EndpointId=endpoint_id, **default_args)218 assert (219 "FIPS is not supported with EventBridge multi-region endpoints"220 in str(e.value)221 )222 def test_put_events_default_dualstack_fips_endpoint(self):223 config = Config(use_dualstack_endpoint=True, use_fips_endpoint=True)224 client, stubber = self.create_stubbed_eventbridge_client(225 with_default_responses=True, config=config226 )227 default_args = self._default_put_events_args()228 with stubber:229 client.put_events(**default_args)230 assert (231 stubber.requests[0].url == "https://events-fips.us-east-1.api.aws/"232 )233 @requires_crt()234 def test_put_events_endpoint_id_dualstack_fips(self):235 config = Config(use_dualstack_endpoint=True, use_fips_endpoint=True)236 client, stubber = self.create_stubbed_eventbridge_client(237 with_default_responses=True, config=config238 )239 default_args = self._default_put_events_args()240 endpoint_id = "abc123.456def"241 with pytest.raises(InvalidEndpointConfigurationError) as e:242 client.put_events(EndpointId=endpoint_id, **default_args)243 assert (244 "FIPS is not supported with EventBridge multi-region endpoints"245 in str(e.value)246 )247 def test_put_events_default_gov_endpoint(self):248 client, stubber = self.create_stubbed_eventbridge_client(249 with_default_responses=True,250 region="us-iso-east-1",251 )252 default_args = self._default_put_events_args()253 with stubber:254 client.put_events(**default_args)255 assert (256 stubber.requests[0].url257 == "https://events.us-iso-east-1.c2s.ic.gov/"258 )259 @requires_crt()260 def test_put_events_endpoint_id_gov(self):261 client, stubber = self.create_stubbed_eventbridge_client(262 with_default_responses=True,263 region="us-iso-east-1",264 )265 default_args = self._default_put_events_args()266 endpoint_id = "abc123.456def"267 with stubber:268 client.put_events(EndpointId=endpoint_id, **default_args)269 self._assert_params_in_body(270 stubber.requests[0],271 [272 ("EndpointId", endpoint_id),273 ],274 )275 self._assert_multi_region_endpoint(276 stubber.requests[0], endpoint_id, suffix="c2s.ic.gov"277 )278 self._assert_sigv4a_headers(stubber.requests[0])279 def test_put_events_default_custom_endpoint(self):280 client, stubber = self.create_stubbed_eventbridge_client(281 with_default_responses=True, endpoint_url="https://example.org"282 )283 default_args = self._default_put_events_args()284 with stubber:285 client.put_events(**default_args)286 assert stubber.requests[0].url == "https://example.org/"287 @requires_crt()288 def test_put_events_endpoint_id_custom(self):289 client, stubber = self.create_stubbed_eventbridge_client(290 with_default_responses=True, endpoint_url="https://example.org"291 )292 default_args = self._default_put_events_args()293 endpoint_id = "abc123.456def"294 with stubber:295 client.put_events(EndpointId=endpoint_id, **default_args)296 self._assert_params_in_body(297 stubber.requests[0],298 [299 ("EndpointId", endpoint_id),300 ],301 )302 assert stubber.requests[0].url == "https://example.org"...

Full Screen

Full Screen

test_buffer.py

Source:test_buffer.py Github

copy

Full Screen

...36 event = {"event": "data"}37 for i in range(MAX_SIZE * 2):38 buffer.put_event(event)39 assert buffer.size() == MAX_SIZE40def test_put_events(buffer):41 events = [{"event": "data"}] * 242 dropped = buffer.put_events(events)43 assert buffer.size() == 244 assert dropped == 045def test_put_events_max_size(buffer):46 events = [{"event": "data"}] * MAX_SIZE * 247 dropped = buffer.put_events(events)48 assert buffer.size() == MAX_SIZE49 assert dropped == MAX_SIZE50def test_buffer_drops_events_when_put_events(buffer):51 events = [{"event": "data"}] * MAX_SIZE52 buffer.put_events(events)53 dropped = buffer.put_events(events)54 assert dropped == MAX_SIZE55 assert buffer.size() == MAX_SIZE56def test_put_multi_key_events(patch_connection_pool):57 key_a, events_a = "buffer_a", [{"event": "data"}] * 258 key_b, events_b = "buffer_b", [{"event": "data"}] * MAX_SIZE59 key_c, events_c = "buffer_c", [{"event": "data"}] * MAX_SIZE * 260 buffer_a, buffer_b, buffer_c = (61 get_buffer(key_a),62 get_buffer(key_b),63 get_buffer(key_c),64 )65 dropped = buffer_a.put_multi_key_events(66 {key_a: events_a, key_b: events_b, key_c: events_c}67 )68 assert buffer_a.size() == 269 assert buffer_b.size() == MAX_SIZE70 assert buffer_c.size() == MAX_SIZE71 assert dropped == {key_a: 0, key_b: 0, key_c: MAX_SIZE}72def test_pop_event(buffer):73 event = "buffer", {"event": "data"}74 buffer.put_event(event)75 popped_event = buffer.pop_event()76 assert id(event) != id(popped_event)77 assert event == popped_event78 assert buffer.size() == 079 assert buffer.pop_event() is None80def test_pop_events(buffer):81 events = [{"event": f"data{i}"} for i in range(MAX_SIZE)]82 buffer.put_events(events)83 popped_events = buffer.pop_events()84 assert len(popped_events) == BATCH_SIZE85 assert popped_events == events[:BATCH_SIZE]86 assert buffer.size() == MAX_SIZE - BATCH_SIZE87def test_pop_events_get_size(buffer):88 events = [{"event": f"data{i}"} for i in range(MAX_SIZE)]89 buffer.put_events(events)90 popped_events, size = buffer.pop_events_get_size()91 assert len(popped_events) == BATCH_SIZE92 assert popped_events == events[:BATCH_SIZE]93 assert size == MAX_SIZE - BATCH_SIZE94 assert buffer.size() == size95def test_clear(buffer):96 events = [{"event": "data"}] * 297 buffer.put_events(events)98 assert buffer.size() == 299 assert buffer.clear() == 2...

Full Screen

Full Screen

eventbridge.py

Source:eventbridge.py Github

copy

Full Screen

...9 self.bus = bus10 self.endpoint=endpoint11 def put_event(self, source, event):12 if self.endpoint is not None:13 response = self.client.put_events(14 EndpointId=self.endpoint,15 Entries=[16 {17 "Time": datetime.now(),18 "Source": source,19 "Resources": [],20 "DetailType": "boto3.client.put_events()",21 "Detail": event,22 "EventBusName": self.bus23 }24 ]25 )26 else:27 response = self.client.put_events(28 Entries=[29 {30 "Time": datetime.now(),31 "Source": source,32 "Resources": [],33 "DetailType": "boto3.client.put_events()",34 "Detail": event,35 "EventBusName": self.bus36 }37 ]38 )...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful