Best Python code snippet using localstack_python
music.py
Source:music.py
...110 except discord.errors.NotFound:111 pass112 self.menu_content = []113 self.menu_type = "NONE"114 async def send_notifications(self, messages):115 for message in self.notifications:116 try:117 await message.delete()118 except discord.errors.NotFound:119 pass120 self.notifications = []121 if type(messages) is list:122 for message in messages:123 self.notifications.append(await self.context.send(message))124 else:125 self.notifications.append(await self.context.send(messages))126 async def send_embed(self, embed):127 if self.embed:128 await self.embed.delete()129 self.embed = self.context.send(embed=embed)130 async def join_channel(self, ctx):131 self.context = ctx132 if not ctx.author.voice:133 await self.send_notifications("> You are not in a voice channel.")134 return False135 channel = self.context.author.voice.channel136 await super().connect(channel.id)137 return True138 async def leave_channel(self):139 await self.destroy()140 async def play_song(self, query, ctx):141 self.context = ctx142 query = query.strip("<>")143 # If it is not a URL144 if re.match(URL_REGEX, query):145 tracks = await self.wavelink.get_tracks(query)146 await self.add_track_to_queue(ctx, tracks[0], query)147 return148 query = f"ytsearch:{query}"149 # Search and add the song to queue150 tracks = await self.wavelink.get_tracks(query)151 if not tracks:152 await self.send_notifications("> No songs found for that query.")153 return154 await self.clear_search_messages()155 self.menu_type = "SONG"156 for track in tracks[:min(len(OPTIONS), len(tracks))]:157 self.menu_content.append(track)158 embed = discord.Embed(159 title=f"Results found for {query}...",160 timestamp=dt.datetime.utcnow()161 )162 # embed.set_author(name = "Query Results")163 embed.set_footer(text=f"Requested by {ctx.author.display_name}") # icon_url=ctx.author.avatar.url164 embed = discord.Embed(165 title=f"Choose the song:",166 description=(167 "\n".join(168 f"**{i + 1}.** {t.title} ({t.length // 60000}:{str(t.length % 60).zfill(2)})"169 for i, t in enumerate(tracks[:min(len(OPTIONS), len(tracks))])170 )171 ),172 colour=ctx.author.colour,173 timestamp=dt.datetime.utcnow()174 )175 self.select_embed = await ctx.send(embed=embed)176 for emoji in list(OPTIONS.keys())[:min(len(tracks), len(OPTIONS))]:177 try:178 await self.select_embed.add_reaction(emoji)179 except discord.errors.NotFound:180 pass181 async def add_track_to_queue(self, ctx, track, search_query):182 self.context = ctx183 if not track:184 await self.send_notifications("> No song found.")185 # Todo one day186 # if isinstance(tracks, wavelink.TrackPlaylist):187 # self.queue.add(*tracks.tracks)188 song = Song(track=track, search_query=search_query)189 self.queue.add_song_to_queue(song)190 await self.send_notifications(f"> Added {track.title} to queue.")191 if not self.is_playing:192 await self.next_song(self.context)193 async def next_song(self, ctx=None):194 if ctx:195 self.context = ctx196 song = self.queue.next_song()197 if song:198 self.timestamp = time.time()199 self.accumulated_time = 0200 await self.play(song.track)201 if self.play_menu:202 await self.play_menu.delete()203 self.play_menu = await self.context.send(song.track.uri)204 await self.play_menu.add_reaction("â¸")205 await self.play_menu.add_reaction("â¶")206 if self.queue.position > 0:207 await self.play_menu.add_reaction("â®")208 try:209 await self.play_menu.add_reaction("â")210 await self.play_menu.add_reaction("â¹")211 await self.play_menu.add_reaction("ð")212 await self.play_menu.add_reaction("ð")213 await self.play_menu.add_reaction("ð¿")214 await self.play_menu.add_reaction("ðµ")215 await self.play_menu.add_reaction("ð")216 await self.play_menu.add_reaction("ð")217 await self.play_menu.add_reaction("â¹ï¸")218 except discord.errors.NotFound:219 pass220 else:221 # await self.send_notifications(f"> No more songs in queue.")222 pass223 async def shuffle_queue(self, ctx=None):224 if ctx:225 self.context = ctx226 self.queue.shuffle()227 await self.send_notifications("> Shuffled Queue.")228 async def pause(self, ctx=None):229 if ctx:230 self.context = ctx231 if not self.is_playing:232 await self.send_notifications("> No song playing.")233 return234 if self.is_paused:235 await self.send_notifications("> Already paused.")236 return237 self.accumulated_time = time.time() - self.timestamp238 self.timestamp = 0239 await self.set_pause(True)240 async def resume(self, ctx=None):241 if ctx:242 self.context = ctx243 if not self.is_playing:244 await self.send_notifications("> No song playing.")245 return246 if not self.is_paused:247 await self.send_notifications("> Already playing a song.")248 return249 self.timestamp = time.time()250 await self.set_pause(False)251 async def skip(self, ctx=None):252 if ctx:253 self.context = ctx254 if not self.is_playing:255 await self.send_notifications("> No song playing.")256 return257 await self.stop()258 await self.send_notifications("> Skipped.")259 async def back(self, ctx=None):260 if ctx:261 self.context = ctx262 if self.queue.position <= 0:263 await self.send_notifications("> No previous song to go back to.")264 return265 if not self.is_playing:266 await self.send_notifications("> No song playing.")267 return268 self.queue.position -= 2269 await self.stop()270 async def repeat(self, ctx=None):271 if ctx:272 self.context = ctx273 if not self.is_playing:274 await self.send_notifications("> No song playing.")275 return276 await self.seek(0)277 async def increase_volume(self, ctx=None, value=5):278 if ctx:279 self.context = ctx280 await self.set_volume(min(self.volume + value, 100))281 await self.send_notifications(f"> Set volume to {self.volume}.")282 async def decrease_volume(self, ctx=None, value=5):283 if ctx:284 self.context = ctx285 await self.set_volume(max(self.volume - value, 0))286 await self.send_notifications(f"> Set volume to {self.volume}.")287 async def define_volume(self, ctx, value):288 self.context = ctx289 await self.set_volume(max(min(value, 100), 0))290 await self.send_notifications(f"> Set volume to {self.volume}.")291 async def show_lyrics(self, ctx=None):292 if ctx:293 self.context = ctx294 song = self.queue.get_current_song()295 if not song:296 await self.send_notifications(f"> No song playing.")297 return298 if song.name:299 lyrics = Lyrics.get_lyrics(search_string=song.name)300 if lyrics:301 embed = discord.Embed(302 title=f"Lyrics for {song.author} - {song.name}",303 timestamp=dt.datetime.utcnow()304 )305 # embed.set_author(name = "Query Results")306 embed.set_footer(text=f"Requested by {ctx.author.display_name}") # icon_url=ctx.author.avatar.url307 embed.add_field(name="Lyrics", value=lyrics, inline=False)308 await ctx.send(embed=embed)309 return310 print(f"song name = {song.name}")311 await self.send_notifications("> No lyrics found for current song.")312 async def now_playing(self, ctx=None):313 if ctx:314 self.context = ctx315 if not self.is_playing:316 await self.send_notifications("> No song playing.")317 return318 song = self.queue.get_current_song()319 if not song.name:320 video_title = song.track.title321 spotify_data = Spotify.spotify.search(video_title)322 if len(spotify_data['tracks']['items']) == 0: # Spotify got results323 await self.send_notifications("> Couldn't get information about song to extract lyrics.")324 return325 item = spotify_data['tracks']['items'][0]326 song.name = item['name']327 song.author = item['artists'][0]['name']328 song.album_name = item['album']['name']329 song.year = item['album']['release_date'][:4]330 self.queue.queue[self.queue.position] = song331 seconds = self.accumulated_time332 if self.timestamp != 0:333 seconds += time.time() - self.timestamp334 passed = Utility.format_seconds(seconds)335 full = Utility.format_seconds(int(self.queue.queue[self.queue.position].track.duration / 1000))336 embed = discord.Embed(337 title=f"{song.author} - {song.name}",338 timestamp=dt.datetime.utcnow()339 )340 embed.add_field(name="Details", value=f"Album : {song.album_name}({song.year})\n Time : {passed} / {full}",341 inline=False)342 await self.context.send(embed=embed)343 async def reset(self, ctx=None):344 if ctx:345 self.context = ctx346 self.queue.queue = []347 self.queue.position = -1348 self.timestamp = 0349 self.accumulated_time = 0350 if self.is_playing:351 await self.stop()352 async def full_reset(self, ctx=None):353 if ctx:354 self.context = ctx355 await self.reset()356 if self.is_connected:357 await self.disconnect()358 await self.send_notifications("> Queue emptied.")359 async def list_songs_in_queue(self, ctx):360 self.context = ctx361 songs = self.queue.get_remaining_songs()362 if len(songs) == 0:363 await self.send_notifications("> Queue is empty.")364 return365 embed = discord.Embed(366 title="Queue",367 timestamp=dt.datetime.utcnow()368 )369 embed.set_footer(text=f"Requested by {ctx.author.display_name}") # icon_url=ctx.author.avatar.url370 embed.add_field(name="Currently Playing", value=songs[0].track.title, inline=False)371 if len(songs) > 1:372 embed.add_field(name="Next Up", value="\n".join(song.track.title for song in songs[1:]), inline=False)373 await self.send_embed(embed=embed)374 async def list_songs_in_history(self, ctx):375 self.context = ctx376 songs = self.queue.get_previous_songs()377 if len(songs) == 0:378 await self.send_notifications(">Queue is empty.")379 return380 embed = discord.Embed(381 title="Queue",382 timestamp=dt.datetime.utcnow()383 )384 embed.set_footer(text=f"Requested by {ctx.author.display_name}") # icon_url=ctx.author.avatar.url385 embed.add_field(name="Next Up", value="\n".join(song.track.title for song in songs), inline=False)386 await self.send_embed(embed=embed)387 async def add_song_to_playlist(self, playlist_name, author_id, ctx=None):388 if ctx:389 self.context = ctx390 if not self.is_playing:391 await self.send_notifications("> No song currently playing to be added.")392 return393 playlists = Database.Database.get_playlists(author_id=author_id, playlist_name=playlist_name)394 if len(playlists) == 0:395 await self.send_notifications(f"> No playlist named {Utility.format_input(playlist_name)}.")396 return397 song = self.queue.get_current_song()398 added = Database.Database.insert_song_into_playlist(author_id=author_id, playlist_name=playlist_name,399 video_id=song.track.ytid)400 if added:401 if song.name and song.author:402 await self.send_notifications(403 f"> Added {Utility.format_input(song.author)} - {Utility.format_input(song.name)} to {Utility.format_input(playlist_name)}.")404 else:405 await self.send_notifications(406 f"> Added {Utility.format_input(song.track.title)} to {Utility.format_input(playlist_name)}.")407 else:408 if song.name and song.author:409 await self.send_notifications(410 f"> {Utility.format_input(song.author)} - {Utility.format_input(song.name)} already exists in {Utility.format_input(playlist_name)}.")411 else:412 await self.send_notifications(413 f"> Failed to add {Utility.format_input(song.track.title)} to {Utility.format_input(playlist_name)}.")414 async def delete_song_from_playlist(self, playlist_name, author_id, ctx=None):415 if ctx:416 self.context = ctx417 if not self.is_playing:418 await self.send_notifications("> No song currently playing to be removed.")419 return420 playlists = Database.Database.get_playlists(author_id=author_id, playlist_name=playlist_name)421 if len(playlists) == 0:422 await self.send_notifications(f"> No playlist named {Utility.format_input(playlist_name)}.")423 return424 song = self.queue.get_current_song()425 added = Database.Database.delete_song_from_playlist(author_id=author_id, playlist_name=playlist_name,426 video_id=song.track.ytid)427 if added:428 if song.name and song.author:429 await self.send_notifications(430 f"> Deleted {Utility.format_input(song.author)} - {Utility.format_input(song.name)} from {Utility.format_input(playlist_name)}.")431 else:432 await self.send_notifications(433 f"> Deleted {Utility.format_input(song.track.title)} from {Utility.format_input(playlist_name)}.")434 else:435 if song.name and song.author:436 await self.send_notifications(437 f"> Failed to delete {Utility.format_input(song.author)} - {Utility.format_input(song.name)} from {Utility.format_input(playlist_name)}.")438 else:439 await self.send_notifications(440 f"> Failed to delete {Utility.format_input(song.track.title)} from {Utility.format_input(playlist_name)}.")441 async def delete_song_from_queue(self, index, ctx):442 self.context = ctx443 try:444 index = int(index)445 except ValueError:446 await self.send_notifications("> You need to provide the index integer.")447 return448 if index <= 0:449 await self.send_notifications("> Index must be a positive value.")450 return451 index = index + self.queue.position452 if index > len(self.queue.queue) - 1:453 await self.send_notifications("> Index doesn't exist.")454 return455 del self.queue.queue[index]456 await self.send_notifications("> Removed song from Queue.")457 async def play_playlist(self, playlist_name, ctx=None, author_id=None):458 if ctx:459 self.context = ctx460 author_id = ctx.message.author.id461 else:462 if not author_id:463 await self.send_notifications("> Can't play playlist...")464 return465 await self.reset()466 if playlist_name == "":467 playlists = Database.Database.get_playlists(author_id=author_id, ordered=True)468 if len(playlists) == 0:469 await self.send_notifications("> You have no playlists yet.")470 return471 await self.clear_search_messages()472 self.menu_type = "PLAYLIST"473 # Show them474 for playlist in playlists:475 message = await self.context.send(f"> {playlist[0]}\n")476 self.menu_messages.append(message)477 self.menu_content.append(playlist[0]) # The name478 await message.add_reaction("â¡ï¸")479 self.menu_user_request_id = ctx.message.author.id480 return481 playlists = Database.Database.get_playlists(author_id=author_id, playlist_name=playlist_name)482 if len(playlists) == 0:483 await self.send_notifications(f"> No playlist named:{Utility.format_input(playlist_name)}.")484 return485 video_ids = Database.Database.get_songs_from_playlist(author_id=author_id,486 playlist_name=playlist_name)487 if len(video_ids) == 0:488 await self.send_notifications(f"> {Utility.format_input(playlist_name)} has no songs.")489 return490 for video_id in video_ids:491 video_id = video_id[0]492 url = f"https://www.youtube.com/watch?v={video_id}"493 tracks = await self.wavelink.get_tracks(url)494 song = Song(track=tracks[0])495 self.queue.queue.append(song)496 await self.send_notifications(f"> Playing {Utility.format_input(playlist_name)} with {len(video_ids)} songs.")497 await self.next_song()498 async def list_playlists(self, ctx):499 self.context = ctx500 self.menu_user_request_id = ctx.message.author.id501 playlists = Database.Database.get_playlists(author_id=ctx.message.author.id, ordered=True)502 if len(playlists) == 0:503 await self.send_notifications("> You have no playlists yet.")504 return505 await self.clear_search_messages()506 self.menu_type = "PLAYLIST"507 # Show them508 for playlist in playlists:509 message = await ctx.send("> " + Utility.captitalize_words(playlist[0] + "\n"))510 self.menu_messages.append(message)511 self.menu_content.append(playlist[0]) # The name512 await message.add_reaction("â¡ï¸")513 await message.add_reaction("â")514 async def save_queue_as_playlist(self, playlist_name, ctx):515 self.context = ctx516 if playlist_name == "":517 await self.send_notifications("> You forgot the name of the playlist.")518 return519 playlists = Database.Database.get_playlists(author_id=ctx.message.id, playlist_name=playlist_name)520 if len(playlists) != 0:521 await self.send_notifications("> Playlist already exists in database.")522 return523 inserted = Database.Database.insert_playlist(author_id=ctx.message.author.id, playlist_name=playlist_name)524 if inserted:525 await self.send_notifications(f"> Created playlist {Utility.format_input(playlist_name)}.")526 else:527 await self.send_notifications("> Failed to create playlist.")528 for index in range(self.queue.position, len(self.queue.queue)):529 Database.Database.insert_song_into_playlist(author_id=ctx.message.author.id,530 video_id=self.queue.queue[index].video_id,531 playlist_name=playlist_name)532 async def create_playlist(self, playlist_name, ctx):533 self.context = ctx534 if playlist_name == "":535 await self.send_notifications("> You forgot the name of the playlist.")536 return537 playlists = Database.Database.get_playlists(author_id=ctx.message.author.id, playlist_name=playlist_name)538 if len(playlists) != 0:539 await self.send_notifications("> Playlist already exists in database.")540 return541 inserted = Database.Database.insert_playlist(playlist_name, ctx.message.author.id)542 if inserted:543 await self.send_notifications(f"> Created playlist {playlist_name}.")544 else:545 await self.send_notifications("> Failed to create playlist.")546 async def rename_playlist(self, input, ctx):547 self.context = ctx548 params = input.split("->")549 if len(params) < 2:550 await self.send_notifications("> Please separate previous name and new name with \"->\".")551 return552 playlists = Database.Database.get_playlists(author_id=ctx.message.author.id, playlist_name=params[0])553 if len(playlists) == 0:554 await self.send_notifications(f"> No playlist named {params[0]}.")555 return556 Database.Database.rename_playlist(ctx.message.author.id, params[0], params[1])557 await self.send_notifications(f"> Renamed {params[0]} to {params[1]}.")558 async def delete_playlist(self, playlist_name, ctx=None):559 if ctx:560 self.context = ctx561 if playlist_name:562 playlists = Database.Database.get_playlists(author_id=ctx.message.author.id, playlist_name=playlist_name)563 if len(playlists) == 0:564 await self.context.send(f"> No playlist named {Utility.format_input(playlist_name)}.")565 Database.Database.delete_playlist(author_id=ctx.message.author.id, playlist_name=playlist_name)566 await self.send_notifications(f"> Deleted {Utility.format_input(playlist_name)}.")567 else:568 # TODO569 print("Implement me")570 async def list_playlists_to_add_remove_song(self, payload):571 if not self.is_playing:572 return573 self.menu_user_request_id = payload.user_id574 await self.clear_search_messages()575 playlists = Database.Database.get_playlists(author_id=self.menu_user_request_id, ordered=True)576 if len(playlists) == 0:577 await self.send_notifications("> You have no playlists yet.")578 return579 self.menu_type = "ADD_REMOVE_SONG_TO_PLAYLIST"580 # Show them581 for playlist in playlists:582 message = await self.context.send(f"> {playlist[0]}\n")583 self.menu_messages.append(message)584 self.menu_content.append(585 {"playlist_name": playlist[0], "video_id": self.queue.get_current_song().track.ytid})586 if Database.Database.song_in_playlist(author_id=self.menu_user_request_id, playlist_name=playlist[0],587 video_id=self.queue.get_current_song().track.ytid):588 await message.add_reaction("â")589 else:590 await message.add_reaction("â
")591 async def list_songs_in_playlist(self, playlist_name, ctx):592 self.context = ctx593 if playlist_name == "":594 await self.send_notifications("> Specify playlist's name.")595 return596 playlists = Database.Database.get_playlists(author_id=ctx.message.author.id, playlist_name=playlist_name)597 if len(playlists) == 0:598 await self.send_notifications("> Playlist does not exist.")599 return600 video_ids = Database.Database.get_songs_from_playlist(author_id=ctx.message.author.id,601 playlist_name=playlist_name)602 if len(video_ids) == 0:603 await self.send_notifications(f"> {Utility.format_input(playlist_name)} has no songs.")604 return605 embed = discord.Embed(606 title=f"{playlist_name}",607 timestamp=dt.datetime.utcnow()608 )609 embed.set_footer(text=f"Requested by {ctx.author.display_name}")610 for song in video_ids:611 video_id = song[0]612 url = f"https://www.youtube.com/watch?v={video_id}"613 tracks = await self.wavelink.get_tracks(url)614 song = Song(track=tracks[0])615 title = tracks[0].title616 if song.author and song.name:617 title = f"{song.author} - {song.name}"618 value = ""619 if song.album_name:620 value += f"{song.album_name}"621 if song.year:622 value += f" ({song.year})"623 if not value:624 value = "album unknown"625 embed.add_field(name=title, value=value, inline=False)626 await self.context.send_notifications(embed=embed)627 async def play_album(self, album_name, ctx):628 self.context = ctx629 await self.clear_search_messages()630 if input == "":631 await self.send_notifications("> Specify album to search.")632 return633 spotify_data = Spotify.spotify.search_album(album_name)634 if len(spotify_data["albums"]["items"]) == 0:635 await self.send_notifications("> No results found.")636 return637 self.menu_type = "ALBUM"638 albums = spotify_data['albums']['items']639 albums = albums[:min(10, len(albums))]640 for album in albums:641 album_id = album['id']642 artist = album['artists'][0]['name']643 album_name = album['name']644 album_year = album['release_date'][:4]645 message = await self.context.send(646 f"{Utility.format_input(album_name)} ({album_year}) by {Utility.format_input(artist)}.")647 self.menu_messages.append(message)648 self.menu_content.append(649 {"artist": artist, "album": album_name, "album_year": album_year, "album_id": album_id})650 await message.add_reaction("â¡ï¸")651 async def search_select(self, payload):652 if self.context.message.author.voice:653 if not await self.join_channel(self.context):654 return655 content = None656 for index, message in enumerate(self.menu_messages):657 if message.id == payload.message_id:658 content = self.menu_content[index]659 break660 if self.menu_type == "SONG":661 index = OPTIONS[payload.emoji.name]662 song = Song(track=self.menu_content[index])663 await self.clear_search_messages()664 self.queue.queue.append(song)665 if not self.is_playing:666 await self.next_song()667 elif self.menu_type == "ALBUM":668 await self.clear_search_messages()669 await self.reset()670 tracks = Spotify.spotify.get_album(content["album_id"])['tracks']['items']671 for track in tracks:672 song_name = track['name']673 query = f"ytsearch:{content['artist']} - {song_name}"674 # Search and add the song to queue675 tracks = await self.wavelink.get_tracks(query)676 if not tracks:677 await self.send_notifications(678 f"> Couldn't find song: {Utility.format_input(content['artist'])} - {Utility.format_input(song_name)}.")679 continue680 song = Song(track=tracks[0], name=song_name, author=content["artist"], album_name=content["album"],681 year=content["album_year"])682 self.queue.queue.append(song)683 await self.send_notifications(684 F"> Playing {Utility.format_input(content['album'])} ({Utility.format_input(content['album_year'])}) by {Utility.format_input(content['artist'])} with {len(self.queue.queue)} songs.")685 await self.next_song()686 elif self.menu_type == "PLAYLIST":687 await self.clear_search_messages()688 if payload.emoji.name == "â¡ï¸":689 await self.play_playlist(playlist_name=content, author_id=self.menu_user_request_id)690 elif payload.emoji.name == "â":691 await self.delete_playlist(playlist_name=content)692 elif self.menu_type == "ADD_REMOVE_SONG_TO_PLAYLIST":693 await self.clear_search_messages()694 # author_id = payload.user_id695 if payload.emoji.name == "â
":696 playlists = Database.Database.get_playlists(author_id=self.menu_user_request_id,697 playlist_name=content["playlist_name"])698 if len(playlists) == 0:699 await self.send_notifications(700 f"> No playlist named {Utility.format_input(content['playlist_name'])}.")701 return702 video_id = content["video_id"]703 added = Database.Database.insert_song_into_playlist(author_id=self.menu_user_request_id,704 playlist_name=content["playlist_name"],705 video_id=video_id)706 if added:707 await self.send_notifications(f"> Added song to {Utility.format_input(content['playlist_name'])}.")708 else:709 await self.send_notifications(710 f"> Song already exists in {Utility.format_input(content['playlist_name'])}.")711 elif payload.emoji.name == "â":712 playlists = Database.Database.get_playlists(author_id=self.menu_user_request_id,713 playlist_name=content["playlist_name"])714 if len(playlists) == 0:715 await self.send_notifications(716 f"> No playlist named {Utility.format_input(content['playlist_name'])}.")717 return718 video_id = content["video_id"]719 removed = Database.Database.delete_song_from_playlist(author_id=self.menu_user_request_id,720 playlist_name=content["playlist_name"],721 video_id=video_id)722 if removed:723 await self.send_notifications(724 f"> Deleted song from {Utility.format_input(content['playlist_name'])}.")725 else:726 await self.send_notifications(727 f"> Song doesn't exist in {Utility.format_input(content['playlist_name'])}.")728 else:729 print("IMPLEMENT ME " + self.menu_type)730 await self.clear_search_messages()731class Music(commands.Cog, wavelink.WavelinkMixin):732 def __init__(self, bot):733 self.bot = bot734 self.wavelink = wavelink.Client(bot=bot)735 self.bot.loop.create_task(self.start_nodes())736 @wavelink.WavelinkMixin.listener()737 async def on_node_ready(self, node):738 print(f"Wavelink node {node.identifier} ready.")739 async def start_nodes(self):740 await self.bot.wait_until_ready()...
uploader.py
Source:uploader.py
1"""2Upload Respa reservations into Exchange as calendar events.3"""4import logging5from django.utils.encoding import force_text6from resources.models import Reservation7from respa_exchange.ews.calendar import CreateCalendarItemRequest, DeleteCalendarItemRequest, UpdateCalendarItemRequest8log = logging.getLogger(__name__)9def _build_subject(res):10 """11 Build a subject line for the given Reservation, to be sent to Exchange12 :type res: resources.models.Reservation13 :return: str14 """15 if res.event_subject:16 return res.event_subject17 bits = ["Respa"]18 if res.reserver_name:19 bits.append(res.reserver_name)20 elif res.user_id:21 bits.append(res.user)22 return " - ".join(force_text(bit) for bit in bits)23def _build_body(res):24 """25 Build the body of the Exchange appointment for a given Reservation.26 :type res: resources.models.Reservation27 :return: str28 """29 return res.event_description or ''30def _build_location(exres, res):31 resource = res.resource32 if resource.name:33 if resource.unit:34 return "%s (%s)" % (resource.name, resource.unit.name)35 return resource.name36 return exres.principal_email37def _get_calendar_item_props(exres):38 res = exres.reservation39 assert isinstance(res, Reservation)40 ret = dict(41 start=res.begin,42 end=res.end,43 subject=_build_subject(res),44 body=_build_body(res),45 location=_build_location(exres, res)46 )47 if res.user and res.user.email:48 ret['required_attendees'] = [res.user.email]49 return ret50def create_on_remote(exres):51 """52 Create and link up an appointment for an ExchangeReservation.53 :param exres: Exchange Reservation54 :type exres: respa_exchange.models.ExchangeReservation55 """56 res = exres.reservation57 if res.state != Reservation.CONFIRMED:58 return59 assert isinstance(res, Reservation)60 send_notifications = True61 if getattr(res, '_skip_notifications', False):62 send_notifications = False63 ccir = CreateCalendarItemRequest(64 principal=force_text(exres.principal_email),65 item_props=_get_calendar_item_props(exres),66 send_notifications=send_notifications67 )68 exres.item_id = ccir.send(exres.exchange.get_ews_session())69 exres.save()70 log.info("Created calendar item for %s", exres)71def update_on_remote(exres):72 """73 Update (or delete) the Exchange appointment for an ExchangeReservation.74 :param exres: Exchange Reservation75 :type exres: respa_exchange.models.ExchangeReservation76 """77 res = exres.reservation78 if res.state in (Reservation.DENIED, Reservation.CANCELLED):79 return delete_on_remote(exres)80 send_notifications = True81 if getattr(res, '_skip_notifications', False):82 send_notifications = False83 # TODO: Should we try and track the state of the object to avoid sending superfluous updates?84 ucir = UpdateCalendarItemRequest(85 principal=force_text(exres.principal_email),86 item_id=exres.item_id,87 update_props=_get_calendar_item_props(exres),88 send_notifications=send_notifications89 )90 exres.item_id = ucir.send(exres.exchange.get_ews_session())91 exres.save()92 log.info("Updated calendar item for %s", exres)93def delete_on_remote(exres):94 """95 Delete the Exchange appointment for an ExchangeReservation.96 :param exres: Exchange Reservation97 :type exres: respa_exchange.models.ExchangeReservation98 """99 send_notifications = True100 if getattr(exres.reservation, '_skip_notifications', False):101 send_notifications = False102 dcir = DeleteCalendarItemRequest(103 principal=exres.principal_email,104 item_id=exres.item_id,105 send_notifications=send_notifications106 )107 dcir.send(exres.exchange.get_ews_session())108 log.info("Deleted %s", exres)...
test_indexer_daemon.py
Source:test_indexer_daemon.py
1#!/usr/bin/env python2# coding: utf-83import os4import sys5import json6from uuid import uuid47import unittest8from unittest import mock9import importlib10from datetime import datetime11pkg_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) # noqa12sys.path.insert(0, pkg_root) # noqa13from dss import Replica14from dss.util.version import datetime_to_version_format15from tests.infra import testmode16daemon_app = importlib.import_module('daemons.dss-index.app')17class TestIndexerDaemon(unittest.TestCase):18 @classmethod19 def setUpClass(cls):20 pass21 @classmethod22 def tearDownClass(cls):23 pass24 @testmode.standalone25 def test_launch_from_operator_queue(self):26 key = f"bundles/{uuid4()}.{datetime_to_version_format(datetime.utcnow())}"27 tests = [(True, "index_object"),28 (False, "index_object"),29 (None, "index_object")]30 for send_notifications, expected_call in tests:31 if send_notifications is not None:32 msg = dict(replica="aws", key=key, send_notifications=send_notifications)33 else:34 msg = dict(replica="aws", key=key)35 event = dict(body=json.dumps(msg))36 with mock.patch("daemons.dss-index.app.Indexer") as indexer:37 with mock.patch("daemons.dss-index.app.CompositeIndexBackend") as backend:38 daemon_app.launch_from_operator_queue(dict(Records=[event]), {})39 name, args, kwargs = indexer.mock_calls[-1]40 self.assertIn(expected_call, name)41 args, kwargs = backend.call_args_list[0]42 self.assertEqual(send_notifications, kwargs['notify'])43 @testmode.standalone44 def test_handle_event(self):45 replica = Replica.aws # This value is not important for this test46 key = f"bundles/{uuid4()}.{datetime_to_version_format(datetime.utcnow())}"47 tests = [(operator_initiated, send_notifications)48 for send_notifications in (True, False, None)49 for operator_initiated in (True, False)]50 for operator_initiated, send_notifications in tests:51 with self.subTest(operator_initiated=operator_initiated, send_notifications=send_notifications):52 with mock.patch("daemons.dss-index.app.Indexer") as indexer:53 with mock.patch("daemons.dss-index.app.CompositeIndexBackend") as backend:54 daemon_app._handle_event(replica, key, {}, send_notifications, operator_initiated)55 name, args, kwargs = indexer.mock_calls[-1]56 if operator_initiated:57 self.assertIn("index_object", name)58 else:59 self.assertIn("process_new_indexable_object", name)60 args, kwargs = backend.call_args_list[0]61 self.assertEqual(send_notifications, kwargs['notify'])62if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!