Best Python code snippet using autotest_python
test_actor.py
Source:test_actor.py
...128 def test_state_change_stopped_to_playing_event(self):129 self.audio.prepare_change()130 self.audio.set_uri(self.uris[0])131 self.audio.start_playback()132 self.audio.wait_for_state_change().get()133 self.assertEvent('state_changed', old_state=PlaybackState.STOPPED,134 new_state=PlaybackState.PLAYING, target_state=None)135 def test_state_change_stopped_to_paused_event(self):136 self.audio.prepare_change()137 self.audio.set_uri(self.uris[0])138 self.audio.pause_playback()139 self.audio.wait_for_state_change().get()140 self.assertEvent('state_changed', old_state=PlaybackState.STOPPED,141 new_state=PlaybackState.PAUSED, target_state=None)142 def test_state_change_paused_to_playing_event(self):143 self.audio.prepare_change()144 self.audio.set_uri(self.uris[0])145 self.audio.pause_playback()146 self.audio.wait_for_state_change()147 self.listener.clear_events()148 self.audio.start_playback()149 self.audio.wait_for_state_change().get()150 self.assertEvent('state_changed', old_state=PlaybackState.PAUSED,151 new_state=PlaybackState.PLAYING, target_state=None)152 def test_state_change_paused_to_stopped_event(self):153 self.audio.prepare_change()154 self.audio.set_uri(self.uris[0])155 self.audio.pause_playback()156 self.audio.wait_for_state_change()157 self.listener.clear_events()158 self.audio.stop_playback()159 self.audio.wait_for_state_change().get()160 self.assertEvent('state_changed', old_state=PlaybackState.PAUSED,161 new_state=PlaybackState.STOPPED, target_state=None)162 def test_state_change_playing_to_paused_event(self):163 self.audio.prepare_change()164 self.audio.set_uri(self.uris[0])165 self.audio.start_playback()166 self.audio.wait_for_state_change()167 self.listener.clear_events()168 self.audio.pause_playback()169 self.audio.wait_for_state_change().get()170 self.assertEvent('state_changed', old_state=PlaybackState.PLAYING,171 new_state=PlaybackState.PAUSED, target_state=None)172 def test_state_change_playing_to_stopped_event(self):173 self.audio.prepare_change()174 self.audio.set_uri(self.uris[0])175 self.audio.start_playback()176 self.audio.wait_for_state_change()177 self.listener.clear_events()178 self.audio.stop_playback()179 self.audio.wait_for_state_change().get()180 self.assertEvent('state_changed', old_state=PlaybackState.PLAYING,181 new_state=PlaybackState.STOPPED, target_state=None)182 def test_stream_changed_event_on_playing(self):183 self.audio.prepare_change()184 self.audio.set_uri(self.uris[0])185 self.listener.clear_events()186 self.audio.start_playback()187 # Since we are going from stopped to playing, the state change is188 # enough to ensure the stream changed.189 self.audio.wait_for_state_change().get()190 self.assertEvent('stream_changed', uri=self.uris[0])191 def test_stream_changed_event_on_multiple_changes(self):192 self.audio.prepare_change()193 self.audio.set_uri(self.uris[0])194 self.listener.clear_events()195 self.audio.start_playback()196 self.audio.wait_for_state_change().get()197 self.assertEvent('stream_changed', uri=self.uris[0])198 self.audio.prepare_change()199 self.audio.set_uri(self.uris[1])200 self.audio.pause_playback()201 self.audio.wait_for_state_change().get()202 self.assertEvent('stream_changed', uri=self.uris[1])203 def test_stream_changed_event_on_playing_to_paused(self):204 self.audio.prepare_change()205 self.audio.set_uri(self.uris[0])206 self.listener.clear_events()207 self.audio.start_playback()208 self.audio.wait_for_state_change().get()209 self.assertEvent('stream_changed', uri=self.uris[0])210 self.listener.clear_events()211 self.audio.pause_playback()212 self.audio.wait_for_state_change().get()213 self.assertNotEvent('stream_changed', uri=self.uris[0])214 def test_stream_changed_event_on_paused_to_stopped(self):215 self.audio.prepare_change()216 self.audio.set_uri(self.uris[0])217 self.audio.pause_playback()218 self.audio.wait_for_state_change()219 self.listener.clear_events()220 self.audio.stop_playback()221 self.audio.wait_for_state_change().get()222 self.assertEvent('stream_changed', uri=None)223 def test_position_changed_on_pause(self):224 self.audio.prepare_change()225 self.audio.set_uri(self.uris[0])226 self.audio.pause_playback()227 self.audio.wait_for_state_change()228 self.audio.wait_for_state_change().get()229 self.assertEvent('position_changed', position=0)230 def test_stream_changed_event_on_paused_to_playing(self):231 self.audio.prepare_change()232 self.audio.set_uri(self.uris[0])233 self.listener.clear_events()234 self.audio.pause_playback()235 self.audio.wait_for_state_change().get()236 self.assertEvent('stream_changed', uri=self.uris[0])237 self.listener.clear_events()238 self.audio.start_playback()239 self.audio.wait_for_state_change().get()240 self.assertNotEvent('stream_changed', uri=self.uris[0])241 def test_position_changed_on_play(self):242 self.audio.prepare_change()243 self.audio.set_uri(self.uris[0])244 self.audio.start_playback()245 self.audio.wait_for_state_change()246 self.audio.wait_for_state_change().get()247 self.assertEvent('position_changed', position=0)248 def test_position_changed_on_seek_while_stopped(self):249 self.audio.prepare_change()250 self.audio.set_uri(self.uris[0])251 self.audio.set_position(2000)252 self.audio.wait_for_state_change().get()253 self.assertNotEvent('position_changed', position=0)254 def test_position_changed_on_seek_after_play(self):255 self.audio.prepare_change()256 self.audio.set_uri(self.uris[0])257 self.audio.start_playback()258 self.audio.wait_for_state_change()259 self.listener.clear_events()260 self.audio.set_position(2000)261 self.audio.wait_for_state_change().get()262 self.assertEvent('position_changed', position=2000)263 def test_position_changed_on_seek_after_pause(self):264 self.audio.prepare_change()265 self.audio.set_uri(self.uris[0])266 self.audio.pause_playback()267 self.audio.wait_for_state_change()268 self.listener.clear_events()269 self.audio.set_position(2000)270 self.audio.wait_for_state_change().get()271 self.assertEvent('position_changed', position=2000)272 def test_tags_changed_on_playback(self):273 self.audio.prepare_change()274 self.audio.set_uri(self.uris[0])275 self.audio.start_playback()276 self.audio.wait_for_state_change().get()277 self.assertEvent('tags_changed', tags=mock.ANY)278 # Unlike the other events, having the state changed done is not279 # enough to ensure our event is called. So we setup a threading280 # event that we can wait for with a timeout while the track playback281 # completes.282 def test_stream_changed_event_on_paused(self):283 event = self.listener.wait('stream_changed').get()284 self.audio.prepare_change()285 self.audio.set_uri(self.uris[0])286 self.audio.pause_playback().get()287 self.audio.wait_for_state_change().get()288 if not event.wait(timeout=1.0):289 self.fail('Stream changed not reached within deadline')290 self.assertEvent('stream_changed', uri=self.uris[0])291 def test_reached_end_of_stream_event(self):292 event = self.listener.wait('reached_end_of_stream').get()293 self.audio.prepare_change()294 self.audio.set_uri(self.uris[0])295 self.audio.start_playback()296 self.audio.wait_for_state_change().get()297 self.possibly_trigger_fake_about_to_finish()298 if not event.wait(timeout=1.0):299 self.fail('End of stream not reached within deadline')300 self.assertFalse(self.audio.get_current_tags().get())301 def test_gapless(self):302 uris = self.uris[1:]303 event = self.listener.wait('reached_end_of_stream').get()304 def callback():305 if uris:306 self.audio.set_uri(uris.pop()).get()307 self.audio.set_about_to_finish_callback(callback).get()308 self.audio.prepare_change()309 self.audio.set_uri(self.uris[0])310 self.audio.start_playback()311 self.possibly_trigger_fake_about_to_finish()312 self.audio.wait_for_state_change().get()313 self.possibly_trigger_fake_about_to_finish()314 self.audio.wait_for_state_change().get()315 if not event.wait(timeout=1.0):316 self.fail('EOS not received')317 # Check that both uris got played318 self.assertEvent('stream_changed', uri=self.uris[0])319 self.assertEvent('stream_changed', uri=self.uris[1])320 # Check that events counts check out.321 keys = [k for k, v in self.listener.get_events().get()]322 self.assertEqual(2, keys.count('stream_changed'))323 self.assertEqual(2, keys.count('position_changed'))324 self.assertEqual(1, keys.count('state_changed'))325 self.assertEqual(1, keys.count('reached_end_of_stream'))326 # TODO: test tag states within gaples327 # TODO: this does not belong in this testcase328 def test_current_tags_are_blank_to_begin_with(self):329 self.assertFalse(self.audio.get_current_tags().get())330 def test_current_tags_blank_after_end_of_stream(self):331 event = self.listener.wait('reached_end_of_stream').get()332 self.audio.prepare_change()333 self.audio.set_uri(self.uris[0])334 self.audio.start_playback()335 self.possibly_trigger_fake_about_to_finish()336 self.audio.wait_for_state_change().get()337 if not event.wait(timeout=1.0):338 self.fail('EOS not received')339 self.assertFalse(self.audio.get_current_tags().get())340 def test_current_tags_stored(self):341 event = self.listener.wait('reached_end_of_stream').get()342 tags = []343 def callback():344 tags.append(self.audio.get_current_tags().get())345 self.audio.set_about_to_finish_callback(callback).get()346 self.audio.prepare_change()347 self.audio.set_uri(self.uris[0])348 self.audio.start_playback()349 self.possibly_trigger_fake_about_to_finish()350 self.audio.wait_for_state_change().get()351 if not event.wait(timeout=1.0):352 self.fail('EOS not received')353 self.assertTrue(tags[0])354 # TODO: test that we reset when we expect between songs355class AudioDummyEventTest(DummyMixin, AudioEventTest):356 """Exercise the AudioEventTest against our mock audio classes."""357# TODO: move to mixer tests...358class MixerTest(BaseTest):359 @unittest.SkipTest360 def test_set_mute(self):361 for value in (True, False):362 self.assertTrue(self.audio.set_mute(value).get())363 self.assertEqual(value, self.audio.get_mute().get())364 @unittest.SkipTest...
example_acm.py
Source:example_acm.py
...37 self.__last_state_change = datetime.datetime.now()38 @property39 def last_state_change(self) -> Optional[datetime.datetime]:40 return self.__last_state_change41async def wait_for_state_change(state_change_listener: ExampleStateListener, time: datetime.datetime, logger: logging.Logger):42 while state_change_listener.last_state_change is None or state_change_listener.last_state_change <= time:43 logger.info("Waiting for state change occurred after %s...", time)44 await asyncio.sleep(5)45async def main() -> NoReturn:46 logger = logging.getLogger(__name__)47 logger.setLevel(logging.INFO)48 handler: logging.Handler = logging.StreamHandler()49 handler.setFormatter(logging.Formatter(fmt="%(asctime)s [%(levelname)s] %(message)s"))50 logger.addHandler(handler)51 parser: argparse.ArgumentParser = argparse.ArgumentParser("python %s" % __file__)52 parser.add_argument("--host", required=True, type=str, dest=AD_HOST,53 help="IP address or host name (that resolves to the IP address) of the device to connect to")54 parser.add_argument("--apiSecretKey", required=True, type=str, dest=AD_API_SECRET_KEY,55 help="API Secret Key for the API access")56 parser.add_argument("--apiAuthKey", required=True, type=str, dest=AD_API_AUTH_KEY,57 help="API Auth Key for the API access")58 parser.add_argument("--action", required=False, dest=AD_ACTION,59 choices=[AV_ACTION_TRIGGER, AV_ACTION_OPEN, AV_ACTION_CLOSE],60 help="The action to be triggered on the device. If not given the state of the device will be "61 "logged.")62 args: dict = vars(parser.parse_args())63 connection_options: aioremootio.ConnectionOptions = \64 aioremootio.ConnectionOptions(args[AD_HOST], args[AD_API_SECRET_KEY], args[AD_API_AUTH_KEY])65 state_change_listener: ExampleStateListener = ExampleStateListener(logger)66 async with aiohttp.ClientSession() as client_session:67 try:68 async with aioremootio.RemootioClient(69 connection_options,70 client_session,71 aioremootio.LoggerConfiguration(logger=logger),72 [state_change_listener]73 ) as remootio_client:74 if AD_ACTION not in args or args[AD_ACTION] is None:75 logger.info("State of the device: %s", remootio_client.state)76 elif args[AD_ACTION] == AV_ACTION_TRIGGER:77 await remootio_client.trigger()78 await wait_for_state_change(state_change_listener, state_change_listener.last_state_change, logger)79 elif args[AD_ACTION] == AV_ACTION_OPEN:80 await remootio_client.trigger_open()81 await wait_for_state_change(state_change_listener, state_change_listener.last_state_change, logger)82 elif args[AD_ACTION] == AV_ACTION_CLOSE:83 await remootio_client.trigger_close()84 await wait_for_state_change(state_change_listener, state_change_listener.last_state_change, logger)85 except aioremootio.RemootioClientConnectionEstablishmentError:86 logger.exception("The client has failed to establish connection to the Remootio device.")87 except aioremootio.RemootioClientAuthenticationError:88 logger.exception("The client has failed to authenticate with the Remootio device.")89 except aioremootio.RemootioError:90 logger.exception("Failed to create client because of an error.")91if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!