Best Python code snippet using pytractor_python
bus_vehicle_handler_test.py
Source:bus_vehicle_handler_test.py
1#!/usr/local/bin/python2# -*- coding: utf-8 -*-3"""4- LICENCE5The MIT License (MIT)6Copyright (c) 2016 Eleftherios Anagnostopoulos for Ericsson AB (EU FP7 CityPulse Project)7Permission is hereby granted, free of charge, to any person obtaining a copy8of this software and associated documentation files (the "Software"), to deal9in the Software without restriction, including without limitation the rights10to use, copy, modify, merge, publish, distribute, sublicense, and/or sell11copies of the Software, and to permit persons to whom the Software is12furnished to do so, subject to the following conditions:13The above copyright notice and this permission notice shall be included in all14copies or substantial portions of the Software.15THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR16IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,17FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE18AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER19LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,20OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE21SOFTWARE.22- DESCRIPTION OF DOCUMENTS23-- MongoDB Database Documents:24address_document: {25 '_id', 'name', 'node_id', 'point': {'longitude', 'latitude'}26}27bus_line_document: {28 '_id', 'bus_line_id', 'bus_stops': [{'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}}]29}30bus_stop_document: {31 '_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}32}33bus_stop_waypoints_document: {34 '_id', 'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},35 'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},36 'waypoints': [[edge_object_id]]37}38bus_vehicle_document: {39 '_id', 'bus_vehicle_id', 'maximum_capacity',40 'routes': [{'starting_datetime', 'ending_datetime', 'timetable_id'}]41}42detailed_bus_stop_waypoints_document: {43 '_id', 'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},44 'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},45 'waypoints': [[edge_document]]46}47edge_document: {48 '_id', 'starting_node': {'osm_id', 'point': {'longitude', 'latitude'}},49 'ending_node': {'osm_id', 'point': {'longitude', 'latitude'}},50 'max_speed', 'road_type', 'way_id', 'traffic_density'51}52node_document: {53 '_id', 'osm_id', 'tags', 'point': {'longitude', 'latitude'}54}55point_document: {56 '_id', 'osm_id', 'point': {'longitude', 'latitude'}57}58timetable_document: {59 '_id', 'timetable_id', 'bus_line_id', 'bus_vehicle_id',60 'timetable_entries': [{61 'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},62 'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},63 'departure_datetime', 'arrival_datetime', 'number_of_onboarding_passengers',64 'number_of_deboarding_passengers', 'number_of_current_passengers',65 'route': {66 'total_distance', 'total_time', 'node_osm_ids', 'points', 'edges',67 'distances_from_starting_node', 'times_from_starting_node',68 'distances_from_previous_node', 'times_from_previous_node'69 }70 }],71 'travel_requests': [{72 '_id', 'client_id', 'bus_line_id',73 'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},74 'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},75 'departure_datetime', 'arrival_datetime',76 'starting_timetable_entry_index', 'ending_timetable_entry_index'77 }]78}79traffic_event_document: {80 '_id', 'event_id', 'event_type', 'event_level', 'point': {'longitude', 'latitude'}, 'datetime'81}82travel_request_document: {83 '_id', 'client_id', 'bus_line_id',84 'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},85 'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},86 'departure_datetime', 'arrival_datetime',87 'starting_timetable_entry_index', 'ending_timetable_entry_index'88}89way_document: {90 '_id', 'osm_id', 'tags', 'references'91}92-- Route Generator Responses:93get_route_between_two_bus_stops: {94 'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},95 'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},96 'route': {97 'total_distance', 'total_time', 'node_osm_ids', 'points', 'edges',98 'distances_from_starting_node', 'times_from_starting_node',99 'distances_from_previous_node', 'times_from_previous_node'100 }101}102get_route_between_multiple_bus_stops: [{103 'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},104 'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},105 'route': {106 'total_distance', 'total_time', 'node_osm_ids', 'points', 'edges',107 'distances_from_starting_node', 'times_from_starting_node',108 'distances_from_previous_node', 'times_from_previous_node'109 }110}]111get_waypoints_between_two_bus_stops: {112 'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},113 'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},114 'waypoints': [[{115 '_id', 'starting_node': {'osm_id', 'point': {'longitude', 'latitude'}},116 'ending_node': {'osm_id', 'point': {'longitude', 'latitude'}},117 'max_speed', 'road_type', 'way_id', 'traffic_density'118 }]]119}120get_waypoints_between_multiple_bus_stops: [{121 'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},122 'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},123 'waypoints': [[{124 '_id', 'starting_node': {'osm_id', 'point': {'longitude', 'latitude'}},125 'ending_node': {'osm_id', 'point': {'longitude', 'latitude'}},126 'max_speed', 'road_type', 'way_id', 'traffic_density'127 }]]128}]129"""130import time131import os132import sys133sys.path.append(os.path.join(os.path.dirname(__file__), '../'))134from src.common.logger import log135from src.look_ahead.bus_vehicle_handler import BusVehicleHandler136__author__ = 'Eleftherios Anagnostopoulos'137__email__ = 'eanagnostopoulos@hotmail.com'138__credits__ = [139 'Azadeh Bararsani (Senior Researcher at Ericsson AB) - email: azadeh.bararsani@ericsson.com'140 'Aneta Vulgarakis Feljan (Senior Researcher at Ericsson AB) - email: aneta.vulgarakis@ericsson.com'141]142class BusVehicleHandlerTester(object):143 def __init__(self):144 self.module_name = 'bus_vehicle_handler_tester'145 self.log_type = 'INFO'146 self.log_message = 'initialize_bus_vehicle_handler: starting'147 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)148 self.start_time = time.time()149 self.bus_vehicle_handler = BusVehicleHandler()150 self.elapsed_time = time.time() - self.start_time151 self.log_message = 'initialize_bus_vehicle_handler: finished - elapsed_time = ' \152 + str(self.elapsed_time) + ' sec'153 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)154 def test_clear_bus_vehicle_documents_collection(self):155 """156 Delete all the documents of the BusVehicleDocuments collection.157 :return: number_of_deleted_documents: int158 """159 self.log_message = 'test_clear_bus_vehicle_documents_collection: starting'160 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)161 self.start_time = time.time()162 number_of_deleted_documents = self.bus_vehicle_handler.clear_bus_vehicle_documents_collection()163 self.elapsed_time = time.time() - self.start_time164 self.log_message = 'test_clear_bus_vehicle_documents_collection: finished - elapsed_time = ' \165 + str(self.elapsed_time) + ' sec'166 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)167 return number_of_deleted_documents168 def test_delete_bus_vehicle_document(self, object_id=None, bus_vehicle_id=None):169 """170 Delete a bus_vehicle_document.171 :param object_id: ObjectId172 :param bus_vehicle_id: int173 :return: True if the document was successfully deleted, otherwise False.174 """175 self.log_message = 'test_delete_bus_vehicle_document: starting'176 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)177 self.start_time = time.time()178 deleted = self.bus_vehicle_handler.delete_bus_vehicle_document(179 object_id=object_id,180 bus_vehicle_id=bus_vehicle_id181 )182 self.elapsed_time = time.time() - self.start_time183 self.log_message = 'test_delete_bus_vehicle_document: finished - elapsed_time = ' \184 + str(self.elapsed_time) + ' sec'185 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)186 return deleted187 def test_delete_bus_vehicle_documents(self, object_ids=None, bus_vehicle_ids=None):188 """189 Delete multiple bus_vehicle_documents.190 :param object_ids: [ObjectId]191 :param bus_vehicle_ids: [int]192 :return: number_of_deleted_documents: int193 """194 self.log_message = 'test_delete_bus_vehicle_documents: starting'195 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)196 self.start_time = time.time()197 number_of_deleted_documents = self.bus_vehicle_handler.delete_bus_vehicle_documents(198 object_ids=object_ids,199 bus_vehicle_ids=bus_vehicle_ids200 )201 self.elapsed_time = time.time() - self.start_time202 self.log_message = 'test_delete_bus_vehicle_documents: finished - elapsed_time = ' \203 + str(self.elapsed_time) + ' sec'204 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)205 return number_of_deleted_documents206 def test_generate_bus_vehicle_document(self, maximum_capacity):207 """208 Generate a new bus_vehicle_document.209 :param maximum_capacity: int210 :return: new_object_id: ObjectId211 """212 self.log_message = 'test_generate_bus_vehicle_document: starting'213 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)214 self.start_time = time.time()215 new_object_id = self.bus_vehicle_handler.generate_bus_vehicle_document(216 maximum_capacity=maximum_capacity217 )218 self.elapsed_time = time.time() - self.start_time219 self.log_message = 'test_generate_bus_vehicle_document: finished - elapsed_time = ' \220 + str(self.elapsed_time) + ' sec'221 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)222 return new_object_id223 def test_generate_bus_vehicle_documents(self, maximum_capacity, number_of_bus_vehicle_documents):224 """225 Generate multiple bus_vehicle_documents.226 :param maximum_capacity: int227 :param number_of_bus_vehicle_documents: int228 :return: new_object_ids: [ObjectIds]229 """230 self.log_message = 'test_generate_bus_vehicle_documents: starting'231 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)232 self.start_time = time.time()233 new_object_ids = self.bus_vehicle_handler.generate_bus_vehicle_documents(234 maximum_capacity=maximum_capacity,235 number_of_bus_vehicle_documents=number_of_bus_vehicle_documents236 )237 self.elapsed_time = time.time() - self.start_time238 self.log_message = 'test_generate_bus_vehicle_documents: finished - elapsed_time = ' \239 + str(self.elapsed_time) + ' sec'240 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)241 return new_object_ids242 def test_insert_bus_vehicle_document(self, bus_vehicle_document=None, bus_vehicle_id=None,243 maximum_capacity=None, routes=None):244 """245 Insert a new bus_vehicle_document or update, if it already exists in the database.246 :param bus_vehicle_document247 :param bus_vehicle_id: int248 :param maximum_capacity: int249 :param routes: [{'starting_datetime', 'ending_datetime', 'timetable_id'}]250 :return: new_object_id: ObjectId251 """252 self.log_message = 'test_insert_bus_vehicle_document: starting'253 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)254 self.start_time = time.time()255 new_object_id = self.bus_vehicle_handler.insert_bus_vehicle_document(256 bus_vehicle_document=bus_vehicle_document,257 bus_vehicle_id=bus_vehicle_id,258 maximum_capacity=maximum_capacity,259 routes=routes260 )261 self.elapsed_time = time.time() - self.start_time262 self.log_message = 'test_insert_bus_vehicle_document: finished - elapsed_time = ' \263 + str(self.elapsed_time) + ' sec'264 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)265 return new_object_id266 def test_insert_bus_vehicle_documents(self, bus_vehicle_documents, insert_many=False):267 """268 Insert multiple bus_vehicle_documents or update existing ones.269 :param bus_vehicle_documents:270 :param insert_many: bool271 :return: new_object_ids: [ObjectId]272 """273 self.log_message = 'test_insert_bus_vehicle_documents: starting'274 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)275 self.start_time = time.time()276 new_object_ids = self.bus_vehicle_handler.insert_bus_vehicle_documents(277 bus_vehicle_documents=bus_vehicle_documents,278 insert_many=insert_many279 )280 self.elapsed_time = time.time() - self.start_time281 self.log_message = 'test_insert_bus_vehicle_documents: finished - elapsed_time = ' \282 + str(self.elapsed_time) + ' sec'283 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)284 return new_object_ids285 def test_print_bus_vehicle_document(self, object_id=None, bus_vehicle_id=None):286 """287 Print a bus_vehicle_document.288 :param object_id: ObjectId289 :param bus_vehicle_id: int290 :return: None291 """292 self.log_message = 'test_print_bus_vehicle_document: starting'293 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)294 self.start_time = time.time()295 self.bus_vehicle_handler.print_bus_vehicle_document(296 object_id=object_id,297 bus_vehicle_id=bus_vehicle_id298 )299 self.elapsed_time = time.time() - self.start_time300 self.log_message = 'test_print_bus_vehicle_document: finished - elapsed_time = ' \301 + str(self.elapsed_time) + ' sec'302 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)303 def test_print_bus_vehicle_documents(self, object_ids=None, bus_vehicle_ids=None, counter=None):304 """305 Print multiple bus_vehicle_documents.306 :param object_ids: [ObjectId]307 :param bus_vehicle_ids: [int]308 :param counter: int309 :return: None310 """311 self.log_message = 'test_print_bus_vehicle_documents: starting'312 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)313 self.start_time = time.time()314 self.bus_vehicle_handler.print_bus_vehicle_documents(315 object_ids=object_ids,316 bus_vehicle_ids=bus_vehicle_ids,317 counter=counter318 )319 self.elapsed_time = time.time() - self.start_time320 self.log_message = 'test_print_bus_vehicle_documents: finished - elapsed_time = ' \321 + str(self.elapsed_time) + ' sec'322 log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)323if __name__ == '__main__':324 bus_vehicle_handler_tester = BusVehicleHandlerTester()325 while True:326 time.sleep(0.01)327 selection = raw_input(328 '\n0. exit'329 '\n1. test_clear_bus_vehicle_documents_collection'330 '\n2. test_delete_bus_vehicle_document'331 '\n3. test_delete_bus_vehicle_documents'332 '\n4. test_generate_bus_vehicle_document'333 '\n5. test_generate_bus_vehicle_documents'334 '\n6. test_insert_bus_vehicle_document'335 '\n7. test_insert_bus_vehicle_documents'336 '\n8. test_print_bus_vehicle_document'337 '\n9. test_print_bus_vehicle_documents'338 '\nSelection: '339 )340 # 0. exit341 if selection == '0':342 break343 # 1. test_clear_bus_vehicle_documents_collection344 elif selection == '1':345 bus_vehicle_handler_tester.test_clear_bus_vehicle_documents_collection()346 # 2. test_delete_bus_vehicle_document347 elif selection == '2':348 bus_vehicle_id = int(349 raw_input(350 '\n2. test_delete_bus_vehicle_document'351 '\nbus_vehicle_id: '352 )353 )354 bus_vehicle_handler_tester.test_delete_bus_vehicle_document(355 object_id=None,356 bus_vehicle_id=bus_vehicle_id357 )358 # 3. test_delete_bus_vehicle_documents359 elif selection == '3':360 bus_vehicle_ids = []361 bus_vehicle_handler_tester.test_delete_bus_vehicle_documents(362 object_ids=None,363 bus_vehicle_ids=bus_vehicle_ids364 )365 # 4. test_generate_bus_vehicle_document366 elif selection == '4':367 maximum_capacity = int(368 raw_input(369 '\n4. test_generate_bus_vehicle_document'370 '\nmaximum_capacity: '371 )372 )373 bus_vehicle_handler_tester.test_generate_bus_vehicle_document(374 maximum_capacity=maximum_capacity375 )376 # 5. test_generate_bus_vehicle_documents377 elif selection == '5':378 maximum_capacity = int(379 raw_input(380 '\n5. test_generate_bus_vehicle_documents'381 '\nmaximum_capacity: '382 )383 )384 number_of_bus_vehicle_documents = int(385 raw_input(386 '\nnumber_of_bus_vehicle_documents: '387 )388 )389 bus_vehicle_handler_tester.test_generate_bus_vehicle_documents(390 maximum_capacity=maximum_capacity,391 number_of_bus_vehicle_documents=number_of_bus_vehicle_documents392 )393 # 6. test_insert_bus_vehicle_document394 elif selection == '6':395 pass396 # 7. test_insert_bus_vehicle_documents397 elif selection == '7':398 pass399 # 8. test_print_bus_vehicle_document400 elif selection == '8':401 bus_vehicle_id = int(402 raw_input(403 '\n8. test_print_bus_vehicle_document'404 '\nbus_vehicle_id: '405 )406 )407 bus_vehicle_handler_tester.test_print_bus_vehicle_document(408 object_id=None,409 bus_vehicle_id=bus_vehicle_id410 )411 # 9. test_print_bus_vehicle_documents412 elif selection == '9':413 bus_vehicle_handler_tester.test_print_bus_vehicle_documents()414 else:...
bans.py
Source:bans.py
1import html2from telegram import ParseMode, Update3from telegram.error import BadRequest4from telegram.ext import CallbackContext, CommandHandler, Filters, run_async5from telegram.utils.helpers import mention_html6from Sherlock import (7 DEMONS,8 DEV_USERS,9 DRAGONS,10 LOGGER,11 OWNER_ID,12 TIGERS,13 WOLVES,14 dispatcher,15)16from Sherlock.modules.disable import DisableAbleCommandHandler17from Sherlock.modules.helper_funcs.chat_status import (18 bot_admin,19 can_restrict,20 connection_status,21 is_user_admin,22 is_user_ban_protected,23 is_user_in_chat,24 user_admin,25 user_can_ban,26)27from Sherlock.modules.helper_funcs.extraction import extract_user_and_text28from Sherlock.modules.helper_funcs.string_handling import extract_time29from Sherlock.modules.log_channel import gloggable, loggable30@run_async31@connection_status32@bot_admin33@can_restrict34@user_admin35@user_can_ban36@loggable37def ban(update: Update, context: CallbackContext) -> str:38 chat = update.effective_chat39 user = update.effective_user40 message = update.effective_message41 log_message = ""42 bot = context.bot43 args = context.args44 user_id, reason = extract_user_and_text(message, args)45 if not user_id:46 message.reply_text("I doubt that's a user.")47 return log_message48 try:49 member = chat.get_member(user_id)50 except BadRequest as excp:51 if excp.message == "User not found":52 message.reply_text("Can't seem to find this person.")53 return log_message54 else:55 raise56 if user_id == bot.id:57 message.reply_text("Oh yeah, ban myself, noob!")58 return log_message59 if is_user_ban_protected(chat, user_id, member) and user not in DEV_USERS:60 if user_id == OWNER_ID:61 message.reply_text("Trying to put me against a God level disaster huh?")62 return log_message63 elif user_id in DEV_USERS:64 message.reply_text("I can't act against our own.")65 return log_message66 elif user_id in DRAGONS:67 message.reply_text(68 "Fighting this Dragon here will put civilian lives at risk."69 )70 return log_message71 elif user_id in DEMONS:72 message.reply_text(73 "Bring an order from Heroes association to fight a Demon disaster."74 )75 return log_message76 elif user_id in TIGERS:77 message.reply_text(78 "Bring an order from Heroes association to fight a Tiger disaster."79 )80 return log_message81 elif user_id in WOLVES:82 message.reply_text("Wolf abilities make them ban immune!")83 return log_message84 else:85 message.reply_text("This user has immunity and cannot be banned.")86 return log_message87 log = (88 f"<b>{html.escape(chat.title)}:</b>\n"89 f"#BANNED\n"90 f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"91 f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"92 )93 if reason:94 log += "\n<b>Reason:</b> {}".format(reason)95 try:96 chat.kick_member(user_id)97 # bot.send_sticker(chat.id, BAN_STICKER) # banhammer marie sticker98 reply = (99 f"<code>â</code><b>Ban Event</b>\n"100 f"<code> </code><b>⢠User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"101 )102 if reason:103 reply += f"\n<code> </code><b>⢠Reason:</b> \n{html.escape(reason)}"104 bot.sendMessage(chat.id, reply, parse_mode=ParseMode.HTML, quote=False)105 return log106 except BadRequest as excp:107 if excp.message == "Reply message not found":108 # Do not reply109 message.reply_text("Banned!", quote=False)110 return log111 else:112 LOGGER.warning(update)113 LOGGER.exception(114 "ERROR banning user %s in chat %s (%s) due to %s",115 user_id,116 chat.title,117 chat.id,118 excp.message,119 )120 message.reply_text("Uhm...that didn't work...")121 return log_message122@run_async123@connection_status124@bot_admin125@can_restrict126@user_admin127@user_can_ban128@loggable129def sban(update: Update, context: CallbackContext) -> str:130 chat = update.effective_chat131 user = update.effective_user132 message = update.effective_message133 log_message = ""134 bot = context.bot135 args = context.args136 user_id, reason = extract_user_and_text(message, args)137 update.effective_message.delete()138 if not user_id:139 return log_message140 try:141 member = chat.get_member(user_id)142 except BadRequest as excp:143 if excp.message == "User not found":144 return log_message145 else:146 raise147 if user_id == bot.id:148 return log_message149 if is_user_ban_protected(chat, user_id, member):150 return log_message151 if user_id == 777000 or user_id == 1087968824:152 return log_message153 log = (154 f"<b>{html.escape(chat.title)}:</b>\n"155 f"#SBANNED\n"156 f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"157 f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"158 )159 if reason:160 log += "\n<b>Reason:</b> {}".format(reason)161 try:162 chat.kick_member(user_id)163 return log164 except BadRequest as excp:165 if excp.message == "Reply message not found":166 return log167 else:168 LOGGER.warning(update)169 LOGGER.exception(170 "ERROR banning user %s in chat %s (%s) due to %s",171 user_id,172 chat.title,173 chat.id,174 excp.message,175 )176 return log_message177@run_async178@connection_status179@bot_admin180@can_restrict181@user_admin182@user_can_ban183@loggable184def temp_ban(update: Update, context: CallbackContext) -> str:185 chat = update.effective_chat186 user = update.effective_user187 message = update.effective_message188 log_message = ""189 bot, args = context.bot, context.args190 user_id, reason = extract_user_and_text(message, args)191 if not user_id:192 message.reply_text("I doubt that's a user.")193 return log_message194 try:195 member = chat.get_member(user_id)196 except BadRequest as excp:197 if excp.message == "User not found":198 message.reply_text("I can't seem to find this user.")199 return log_message200 else:201 raise202 if user_id == bot.id:203 message.reply_text("I'm not gonna BAN myself, are you crazy?")204 return log_message205 if is_user_ban_protected(chat, user_id, member):206 message.reply_text("I don't feel like it.")207 return log_message208 if not reason:209 message.reply_text("You haven't specified a time to ban this user for!")210 return log_message211 split_reason = reason.split(None, 1)212 time_val = split_reason[0].lower()213 if len(split_reason) > 1:214 reason = split_reason[1]215 else:216 reason = ""217 bantime = extract_time(message, time_val)218 if not bantime:219 return log_message220 log = (221 f"<b>{html.escape(chat.title)}:</b>\n"222 "#TEMP BANNED\n"223 f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"224 f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}\n"225 f"<b>Time:</b> {time_val}"226 )227 if reason:228 log += "\n<b>Reason:</b> {}".format(reason)229 try:230 chat.kick_member(user_id, until_date=bantime)231 # bot.send_sticker(chat.id, BAN_STICKER) # banhammer marie sticker232 bot.sendMessage(233 chat.id,234 f"Banned! User {mention_html(member.user.id, html.escape(member.user.first_name))} "235 f"will be banned for {time_val}.",236 parse_mode=ParseMode.HTML,237 )238 return log239 except BadRequest as excp:240 if excp.message == "Reply message not found":241 # Do not reply242 message.reply_text(243 f"Banned! User will be banned for {time_val}.", quote=False244 )245 return log246 else:247 LOGGER.warning(update)248 LOGGER.exception(249 "ERROR banning user %s in chat %s (%s) due to %s",250 user_id,251 chat.title,252 chat.id,253 excp.message,254 )255 message.reply_text("Well damn, I can't ban that user.")256 return log_message257@run_async258@connection_status259@bot_admin260@can_restrict261@user_admin262@user_can_ban263@loggable264def stemp_ban(update: Update, context: CallbackContext) -> str:265 chat = update.effective_chat266 user = update.effective_user267 message = update.effective_message268 log_message = ""269 bot, args = context.bot, context.args270 user_id, reason = extract_user_and_text(message, args)271 update.effective_message.delete()272 if not user_id:273 return log_message274 try:275 member = chat.get_member(user_id)276 except BadRequest as excp:277 if excp.message == "User not found":278 return log_message279 else:280 raise281 if user_id == bot.id:282 return log_message283 if is_user_ban_protected(chat, user_id, member):284 return log_message285 if not reason:286 message.reply_text("You haven't specified a time to ban this user for!")287 return log_message288 split_reason = reason.split(None, 1)289 time_val = split_reason[0].lower()290 if len(split_reason) > 1:291 reason = split_reason[1]292 else:293 reason = ""294 bantime = extract_time(message, time_val)295 if not bantime:296 return log_message297 log = (298 f"<b>{html.escape(chat.title)}:</b>\n"299 "#STEMP BANNED\n"300 f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"301 f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}\n"302 f"<b>Time:</b> {time_val}"303 )304 if reason:305 log += "\n<b>Reason:</b> {}".format(reason)306 try:307 chat.kick_member(user_id, until_date=bantime)308 # bot.send_sticker(chat.id, BAN_STICKER) # banhammer marie sticker309 return log310 except BadRequest as excp:311 if excp.message == "Reply message not found":312 # Do not reply313 return log314 else:315 LOGGER.warning(update)316 LOGGER.exception(317 "ERROR banning user %s in chat %s (%s) due to %s",318 user_id,319 chat.title,320 chat.id,321 excp.message,322 )323 return log_message324@run_async325@connection_status326@bot_admin327@can_restrict328@user_admin329@user_can_ban330@loggable331def kick(update: Update, context: CallbackContext) -> str:332 chat = update.effective_chat333 user = update.effective_user334 message = update.effective_message335 log_message = ""336 bot, args = context.bot, context.args337 user_id, reason = extract_user_and_text(message, args)338 if not user_id:339 message.reply_text("I doubt that's a user.")340 return log_message341 try:342 member = chat.get_member(user_id)343 except BadRequest as excp:344 if excp.message == "User not found":345 message.reply_text("I can't seem to find this user.")346 return log_message347 else:348 raise349 if user_id == bot.id:350 message.reply_text("Yeahhh I'm not gonna do that.")351 return log_message352 if is_user_ban_protected(chat, user_id):353 message.reply_text("I really wish I could kick this user....")354 return log_message355 res = chat.unban_member(user_id) # unban on current user = kick356 if res:357 # bot.send_sticker(chat.id, BAN_STICKER) # banhammer marie sticker358 bot.sendMessage(359 chat.id,360 f"User Kicked Woops! {mention_html(member.user.id, html.escape(member.user.first_name))}.",361 parse_mode=ParseMode.HTML,362 )363 log = (364 f"<b>{html.escape(chat.title)}:</b>\n"365 f"#KICKED\n"366 f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"367 f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"368 )369 if reason:370 log += f"\n<b>Reason:</b> {reason}"371 return log372 else:373 message.reply_text("Well damn, I can't kick that user.")374 return log_message375@run_async376@connection_status377@bot_admin378@can_restrict379@user_admin380@user_can_ban381@loggable382def skick(update: Update, context: CallbackContext) -> str:383 chat = update.effective_chat384 user = update.effective_user385 message = update.effective_message386 log_message = ""387 bot, args = context.bot, context.args388 user_id, reason = extract_user_and_text(message, args)389 update.effective_message.delete()390 if not user_id:391 return log_message392 try:393 member = chat.get_member(user_id)394 except BadRequest as excp:395 if excp.message == "User not found":396 return log_message397 else:398 raise399 if user_id == bot.id:400 return log_message401 if is_user_ban_protected(chat, user_id):402 return log_message403 res = chat.unban_member(user_id) # unban on current user = kick404 if res:405 log = (406 f"<b>{html.escape(chat.title)}:</b>\n"407 f"#SKICKED\n"408 f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"409 f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"410 )411 if reason:412 log += f"\n<b>Reason:</b> {reason}"413 return log414 return log_message415@run_async416@bot_admin417@can_restrict418def kickme(update: Update, context: CallbackContext):419 user_id = update.effective_message.from_user.id420 if is_user_admin(update.effective_chat, user_id):421 update.effective_message.reply_text("I wish I could... but you're an admin.")422 return423 res = update.effective_chat.unban_member(user_id) # unban on current user = kick424 if res:425 update.effective_message.reply_text("*kicks you out of the group*")426 else:427 update.effective_message.reply_text("Huh? I can't :/")428@run_async429@connection_status430@bot_admin431@can_restrict432@user_admin433@user_can_ban434@loggable435def unban(update: Update, context: CallbackContext) -> str:436 message = update.effective_message437 user = update.effective_user438 chat = update.effective_chat439 log_message = ""440 bot, args = context.bot, context.args441 user_id, reason = extract_user_and_text(message, args)442 if not user_id:443 message.reply_text("I doubt that's a user.")444 return log_message445 try:446 member = chat.get_member(user_id)447 except BadRequest as excp:448 if excp.message == "User not found":449 message.reply_text("I can't seem to find this user.")450 return log_message451 else:452 raise453 if user_id == bot.id:454 message.reply_text("How would I unban myself if I wasn't here...?")455 return log_message456 if is_user_in_chat(chat, user_id):457 message.reply_text("Isn't this person already here??")458 return log_message459 chat.unban_member(user_id)460 message.reply_text("Yep, this user can join!")461 log = (462 f"<b>{html.escape(chat.title)}:</b>\n"463 f"#UNBANNED\n"464 f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"465 f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"466 )467 if reason:468 log += f"\n<b>Reason:</b> {reason}"469 return log470@run_async471@connection_status472@bot_admin473@can_restrict474@gloggable475def selfunban(context: CallbackContext, update: Update) -> str:476 message = update.effective_message477 user = update.effective_user478 bot, args = context.bot, context.args479 if user.id not in DRAGONS or user.id not in TIGERS:480 return481 try:482 chat_id = int(args[0])483 except:484 message.reply_text("Give a valid chat ID.")485 return486 chat = bot.getChat(chat_id)487 try:488 member = chat.get_member(user.id)489 except BadRequest as excp:490 if excp.message == "User not found":491 message.reply_text("I can't seem to find this user.")492 return493 else:494 raise495 if is_user_in_chat(chat, user.id):496 message.reply_text("Aren't you already in the chat??")497 return498 chat.unban_member(user.id)499 message.reply_text("Yep, I have unbanned you.")500 log = (501 f"<b>{html.escape(chat.title)}:</b>\n"502 f"#UNBANNED\n"503 f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"504 )505 return log506__help__ = """507*Kicks:*508 ⪠/kick <userhandle>*:* Kicks a user out of the group, (via handle, or reply)509 ⪠/skick <userhandle>*:* Silently kicks a user out of the group, (via handle, or reply)510 ⪠/kickme*:* Kicks the user who used the command.511 512*Bans:*513 ⪠/ban <userhandle>*:* Bans a user. (via handle, or reply)514 ⪠/sban <userhandle>*:* Silently bans a user without leaving any message. (via handle, or reply)515 ⪠/tban <userhandle> x(m/h/d)*:* Bans a user for `x` time. (via handle, or reply). `m` = `minutes`, `h` = `hours`, `d` = `days`.516 ⪠/stban <userhandle> x(m/h/d)*:* Silently bans a user for `x` time. (via handle, or reply). `m` = `minutes`, `h` = `hours`, `d` = `days`.517 ⪠/unban <userhandle>*:* Unbans a user. (via handle, or reply)518_NOTE:_519 If you set Log Channels, you will get logs of Silent kick and bans. Check *Logger* module to know more about Log Channel.520"""521BAN_HANDLER = CommandHandler("ban", ban)522TEMPBAN_HANDLER = CommandHandler(["tban"], temp_ban)523STEMPBAN_HANDLER = CommandHandler(["stban"], stemp_ban)524KICK_HANDLER = CommandHandler("kick", kick)525SKICK_HANDLER = CommandHandler("skick", skick)526UNBAN_HANDLER = CommandHandler("unban", unban)527ROAR_HANDLER = CommandHandler("roar", selfunban)528KICKME_HANDLER = DisableAbleCommandHandler("kickme", kickme, filters=Filters.group)529SBAN_HANDLER = CommandHandler("sban", sban)530dispatcher.add_handler(BAN_HANDLER)531dispatcher.add_handler(TEMPBAN_HANDLER)532dispatcher.add_handler(STEMPBAN_HANDLER)533dispatcher.add_handler(KICK_HANDLER)534dispatcher.add_handler(SKICK_HANDLER)535dispatcher.add_handler(UNBAN_HANDLER)536dispatcher.add_handler(ROAR_HANDLER)537dispatcher.add_handler(KICKME_HANDLER)538dispatcher.add_handler(SBAN_HANDLER)539__mod_name__ = "Bans"540__handlers__ = [541 BAN_HANDLER,542 TEMPBAN_HANDLER,543 STEMPBAN_HANDLER,544 KICK_HANDLER,545 SKICK_HANDLER,546 UNBAN_HANDLER,547 ROAR_HANDLER,548 KICKME_HANDLER,549 SBAN_HANDLER,...
itunesrpc.py
Source:itunesrpc.py
...64# none of this is automatically uploaded, but it should be65# sent within error reports on GitHub, as it may help determine66# your issue67os.remove("log") # delete the old log file.68log_message("Starting system log dump.")69log_message("Machine Architecture: " + platform.machine())70log_message("Machine Version:" + platform.version())71log_message("Machine Platform: " + platform.platform())72log_message("Machine Unix Name: " + str(platform.uname()))73log_message("OS Type: " + platform.system())74log_message("Processor: " + platform.processor())75log_message(76 "Total RAM: " + str(round(psutil.virtual_memory().total / (1024.0**3))) + " GB"77)78log_message("End of system logs.\n")79log_message("Starting iTunesRPC logs.")80# DEFINITIONS81def push_playing(o, DiscordRPC, dict, last_pos, paused_track, moved_playhead):82 paused = False83 # Get all relevant information84 # TRACK INFO85 track = o.CurrentTrack.Name86 # OTHER INFO87 artist = o.CurrentTrack.Artist88 album = o.CurrentTrack.Album89 # SAVE DATA TO FILE, FOR WINDOW90 curr = str({"song": track, "artist": artist, "album": album})91 with open(92 "current_song_info", "w", encoding="utf-8"93 ) as current: # save with context manager to allow for encoding= variable.94 current.write(curr)95 # MODIFY TRACK TO HAVE PAUSED IF PAUSED ON APPLE MUSIC96 if paused_track:97 track = "[PAUSED] " + track98 file_path = os.getcwd() + "\\temporary.png"99 o.CurrentTrack.Artwork.Item(1).SaveArtworkToFile(file_path)100 artwork_url = networking.get("temporary.png", domain, track, artist, album)101 artwork_url = ast.literal_eval(str(artwork_url))102 artwork_url = str(artwork_url[1]) + str(artwork_url[2])103 artwork_url = "https://" + domain + "/itrpc/" + artwork_url104 # os.remove(file_path)105 # log_message INFO106 log_message("Track: " + track)107 log_message("Artist: " + artist)108 log_message("Album: " + album)109 log_message("Artwork URL: " + str(artwork_url))110 pause_button = f"https://{domain}/itrpc/pause.png"111 play_button = f"https://{domain}/itrpc/play.png"112 # timestamps for computing how far into the song we are113 if paused_track is False:114 starttime = int(time.time()) - o.PlayerPosition115 endtime = int(time.time()) + (o.CurrentTrack.Duration - o.PlayerPosition)116 try:117 if moved_playhead:118 DiscordRPC.clear() # get rid of the current status: the left count won't refresh otherwise.119 time.sleep(0.1)120 # if we don't pause for a tiny amount the .update will send, and discord will121 # forget the .clear command.122 if paused_track is True:123 if paused is not True:124 DiscordRPC.update(125 details=track,126 state=artist,127 large_image=artwork_url,128 large_text=album,129 small_image=pause_button,130 small_text="Paused on Apple Music",131 buttons=buttons,132 )133 paused = True134 else:135 if last_pos is not False:136 DiscordRPC.update(137 details=track,138 state=artist,139 start=starttime,140 end=endtime,141 large_image=artwork_url,142 large_text=album,143 small_image=play_button,144 small_text="Playing on Apple Music",145 buttons=buttons,146 )147 else:148 last_pos = o.CurrentTrack.Duration - o.PlayerPosition149 except Exception:150 # Discord is closed if we error here.151 # Let's re open it.152 log_message("..........................................")153 log_message(". Discord is closed... .")154 log_message(". Attempting to open it .")155 log_message(". Waiting global_pause+3 seconds before .")156 log_message(". continuing. .")157 log_message("..........................................")158 DiscordRPC = False159 opened = False160 while DiscordRPC is False:161 try:162 DiscordRPC = pypresence.Presence(secret, pipe=0)163 DiscordRPC.connect()164 log_message("Hooked to Discord.")165 except Exception:166 if not opened:167 os.system(discord_command)168 log_message("..........................................")169 log_message(". Discord is closed... .")170 log_message(". Attempting to open it .")171 log_message(". Waiting global_pause+3 seconds before .")172 log_message(". continuing. .")173 log_message("..........................................")174 opened = True175 time.sleep(global_pause + 3)176 continue177 # Now we have re opened Discord, let's post the message to Discord.178 time.sleep(global_pause)179 # Since we may have had Discord closed for a while, we need to update our items.180 # Let's re call this definition, as it ensures we get the most recent values.181 # We can send our original arguments to this. It isn't a massive deal.182 DiscordRPC, track, artist, album, last_pos, paused = push_playing(183 o, DiscordRPC, dict, last_pos, paused_track, moved_playhead184 )185 # Finally, regardless of what happened, let's return all our values.186 return (DiscordRPC, track, artist, album, last_pos, paused)187# SYSTRAY DEFINITIONS188# window definitions defined at start of program.189def exit_program(systray):190 global shutdown_systray191 shutdown_systray = True192def toggle_rpc(systray):193 global toggled194 toggled = not toggled195 time.sleep(global_pause + 1)196 DiscordRPC.clear()197# SYSTRAY MENU OPTIONS AND MAKING THE ICON198menu_options = (199 ("Show Window", None, itrpc_window.start),200 ("Toggle Rich Presence", None, toggle_rpc),201 ("Shutdown iTunesRPC Safely", None, exit_program),202)203systray = SysTrayIcon("icon.ico", "iTunesRPC", menu_options)204systray.start()205log_message("Started Systray icon.")206# GETTING THE ITUNES COM CONNECTION207o = win32com.client.gencache.EnsureDispatch(208 "iTunes.Application"209) # connect to the COM of iTunes.Application210log_message("Hooked to iTunes COM.")211# NOTE: win32com.client.gencache.EnsureDispatch will force open the application if not already open212# CONNECTING TO DISCORD213DiscordRPC = False214opened = False215while DiscordRPC is False:216 if shutdown_systray:217 log_message("No connection to DiscordRPC, so not closing.")218 systray.shutdown()219 log_message("Shutdown the Systray icon.")220 quit("Shutdown the Python program.")221 try:222 DiscordRPC = pypresence.Presence(secret, pipe=0)223 DiscordRPC.connect()224 log_message("Hooked to Discord.")225 except Exception:226 if not opened:227 os.system(discord_command)228 log_message("..........................................")229 log_message(". Discord is closed... .")230 log_message(". Waiting for it to open before starting .")231 log_message(". Waiting global_pause+3 seconds before .")232 log_message(". continuing. .")233 log_message("..........................................")234 opened = True235 time.sleep(236 global_pause + 3237 ) # takes a while to open discord on lower end hardware so account for that here238 continue239# GET LAST POSITION OF TRACK240stopped = True241while stopped:242 try:243 last_pos = o.CurrentTrack.Duration - o.PlayerPosition244 # LAST TRACK = THE TRACK THAT IS CURRENTLY PLAYED. IT MAKES SENSE IN245 # CODE AS LAST_TRACK IS THE TRACK PLAYED {global_pause} SECONDS AGO246 last_track = o.CurrentTrack.Name247 track = o.CurrentTrack.Name248 stopped = False249 except Exception:250 DiscordRPC.clear()251 o = win32com.client.gencache.EnsureDispatch("iTunes.Application")252 log_message("..........................................")253 log_message(". iTunes is not playing anything... .")254 log_message(". Waiting for it to play before starting .")255 log_message(". Waiting 10 seconds. .")256 log_message("..........................................")257 time.sleep(10)258# LOOP VARIABLES259special_push = False # this is used to determine if another function has already pushed260# to RPC, as we don't want to repeat for loads of different items.261stopped = False # track stopped (iTunes has no track selected)262paused = False # track paused (iTunes has a track selected)263first_run = True # first ran the program.264shutdown_systray = False # shutdown the program from the systray265running = True # run var266skipped = False # skipping song.267toggled = True # showing RP?268# LOOP269while running:270 if toggled:271 if first_run:272 last_pos = o.CurrentTrack.Duration - o.PlayerPosition273 time.sleep(global_pause)274 first_run = False275 try:276 placeholder = o.CurrentTrack.Name # try to get current track name277 except Exception:278 stopped = True279 if stopped is False:280 log_message("------------------")281 # update the last track to be the variable that was playing 5 seconds ago and282 # get the new current track and store it as track283 last_track = track284 track = o.CurrentTrack.Name285 log_message("Last Track: " + last_track)286 log_message("Current Track: " + track)287 log_message("Last Playhead Position: " + str(last_pos))288 log_message(289 "Current Playhead Position: "290 + str((o.CurrentTrack.Duration - o.PlayerPosition))291 )292 log_message(293 "Position Difference: "294 + str(last_pos - (o.CurrentTrack.Duration - o.PlayerPosition))295 )296 log_message("Pushing the following info...")297 if last_track != track: # if we changed tracks.298 special_push = True299 skipped = True300 log_message("Changed track. Getting regular fetch from push_playing.")301 DiscordRPC, track, artist, album, last_pos, paused = push_playing(302 o, DiscordRPC, dict, last_pos, False, False303 )304 if not paused or not skipped:305 if (306 last_pos - (o.CurrentTrack.Duration - o.PlayerPosition)307 < global_pause - 1308 and last_pos - (o.CurrentTrack.Duration - o.PlayerPosition) >= 0309 ):310 special_push = True311 # we are paused312 log_message("Paused. Sending pause message to RPC.")313 DiscordRPC, track, artist, album, last_pos, paused = push_playing(314 o, DiscordRPC, dict, last_pos, True, False315 )316 else:317 paused = False318 if (319 (last_pos - (o.CurrentTrack.Duration - o.PlayerPosition) < 0)320 or (321 last_pos - (o.CurrentTrack.Duration - o.PlayerPosition)322 > global_pause + 1323 )324 ) and last_track == track:325 # we have rewound or fast forwarded within the song. let's make sure we account for that when calling push_playing326 # this could also happen when a new song has started. that is why last_track == track is in this if statement327 log_message(328 "Track position moved over global_pause value, likely skipped forward/backward in the song."329 )330 special_push = True331 DiscordRPC, track, artist, album, last_pos, paused = push_playing(332 o, DiscordRPC, dict, last_pos, False, True333 )334 if special_push is False:335 DiscordRPC, track, artist, album, last_pos, paused = push_playing(336 o, DiscordRPC, dict, last_pos, False, False337 )338 else:339 skipped = False340 special_push = False341 # get the last position of the track. used for pause342 last_pos = o.CurrentTrack.Duration - o.PlayerPosition343 time.sleep(global_pause)344 else:345 DiscordRPC.clear()346 try:347 del o348 except Exception:349 pass350 o = win32com.client.gencache.EnsureDispatch("iTunes.Application")351 log_message("..........................................")352 log_message(". iTunes is not playing anything... .")353 log_message(". Waiting for it to play before starting .")354 log_message(". Waiting 10 seconds. .")355 log_message("..........................................")356 time.sleep(10)357 stopped = False358 try:359 track = o.CurrentTrack.Name360 except Exception as e:361 log_message(e)362 stopped = True363 if shutdown_systray:364 running = False365 log_message("------------------")366 log_message("Shutting down.")367 else:368 log_message("RPC is toggled off. Not showing status.")369 log_message("Waiting 1 second to check if toggled is enabled.")370 time.sleep(1)371# SHUTDOWN372DiscordRPC.close()373log_message("Closed connection to DiscordRPC.")374systray.shutdown()375log_message("Shutdown the Systray icon.")376o.Quit()377log_message("Closed iTunes connection.")378p = open("config", "r")379prev = ast.literal_eval(p.read())380p.close()381prev["gui_window_isOpen"] = False382update = open("config", "w")383update.write(str(prev))384update.close()385log_message(386 "Set GUI isOpen to False to ensure we don't get hanging on next open."387) # If this value is left as true, on the next launch of the app, it is possible that the window will freeze....
xmpp_service_stub.py
Source:xmpp_service_stub.py
1#!/usr/bin/env python2#3# Copyright 2007 Google Inc.4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17"""Stub version of the XMPP API, writes messages to logs."""18import logging19import os20from google.appengine.api import apiproxy_stub21from google.appengine.api import app_identity22from google.appengine.api.xmpp import xmpp_service_pb23from google.appengine.runtime import apiproxy_errors24INVALID_JID_CHARACTERS = ',;()[]'25class XmppServiceStub(apiproxy_stub.APIProxyStub):26 """Python only xmpp service stub.27 This stub does not use an XMPP network. It prints messages to the console28 instead of sending any stanzas.29 """30 THREADSAFE = True31 def __init__(self, log=logging.info, service_name='xmpp'):32 """Initializer.33 Args:34 log: A logger, used for dependency injection.35 service_name: Service name expected for all calls.36 """37 super(XmppServiceStub, self).__init__(service_name)38 self.log = log39 def _Dynamic_GetPresence(self, request, response):40 """Implementation of XmppService::GetPresence.41 Returns online if the first character of the JID comes before 'm' in the42 alphabet, otherwise returns offline.43 Args:44 request: A PresenceRequest.45 response: A PresenceResponse.46 """47 self._GetFrom(request.from_jid())48 self._FillInPresenceResponse(request.jid(), response)49 def _Dynamic_BulkGetPresence(self, request, response):50 self._GetFrom(request.from_jid())51 for jid in request.jid_list():52 subresponse = response.add_presence_response()53 self._FillInPresenceResponse(jid, subresponse)54 def _FillInPresenceResponse(self, jid, response):55 """Arbitrarily fill in a presence response or subresponse."""56 response.set_is_available(jid[0] < 'm')57 response.set_valid(self._ValidateJid(jid))58 response.set_presence(1)59 def _Dynamic_SendMessage(self, request, response):60 """Implementation of XmppService::SendMessage.61 Args:62 request: An XmppMessageRequest.63 response: An XmppMessageResponse .64 """65 from_jid = self._GetFrom(request.from_jid())66 log_message = []67 log_message.append('Sending an XMPP Message:')68 log_message.append(' From:')69 log_message.append(' ' + from_jid)70 log_message.append(' Body:')71 log_message.append(' ' + request.body())72 log_message.append(' Type:')73 log_message.append(' ' + request.type())74 log_message.append(' Raw Xml:')75 log_message.append(' ' + str(request.raw_xml()))76 log_message.append(' To JIDs:')77 for jid in request.jid_list():78 log_message.append(' ' + jid)79 self.log('\n'.join(log_message))80 for jid in request.jid_list():81 if self._ValidateJid(jid):82 response.add_status(xmpp_service_pb.XmppMessageResponse.NO_ERROR)83 else:84 response.add_status(xmpp_service_pb.XmppMessageResponse.INVALID_JID)85 def _Dynamic_SendInvite(self, request, response):86 """Implementation of XmppService::SendInvite.87 Args:88 request: An XmppInviteRequest.89 response: An XmppInviteResponse .90 """91 from_jid = self._GetFrom(request.from_jid())92 self._ParseJid(request.jid())93 log_message = []94 log_message.append('Sending an XMPP Invite:')95 log_message.append(' From:')96 log_message.append(' ' + from_jid)97 log_message.append(' To: ' + request.jid())98 self.log('\n'.join(log_message))99 def _Dynamic_SendPresence(self, request, response):100 """Implementation of XmppService::SendPresence.101 Args:102 request: An XmppSendPresenceRequest.103 response: An XmppSendPresenceResponse .104 """105 from_jid = self._GetFrom(request.from_jid())106 log_message = []107 log_message.append('Sending an XMPP Presence:')108 log_message.append(' From:')109 log_message.append(' ' + from_jid)110 log_message.append(' To: ' + request.jid())111 if request.type():112 log_message.append(' Type: ' + request.type())113 if request.show():114 log_message.append(' Show: ' + request.show())115 if request.status():116 log_message.append(' Status: ' + request.status())117 self.log('\n'.join(log_message))118 def _ParseJid(self, jid):119 """Parse the given JID.120 Also tests that the given jid:121 * Contains one and only one @.122 * Has one or zero resources.123 * Has a node.124 * Does not contain any invalid characters.125 Args:126 jid: The JID to validate127 Returns:128 A tuple (node, domain, resource) representing the JID.129 Raises:130 apiproxy_errors.ApplicationError if the requested JID is invalid using the131 criteria listed above.132 """133 if set(jid).intersection(INVALID_JID_CHARACTERS):134 self.log('Invalid JID: characters "%s" not supported. JID: %s',135 INVALID_JID_CHARACTERS,136 jid)137 raise apiproxy_errors.ApplicationError(138 xmpp_service_pb.XmppServiceError.INVALID_JID)139 node, domain, resource = ('', '', '')140 at = jid.find('@')141 if at == -1:142 self.log('Invalid JID: No \'@\' character found. JID: %s', jid)143 raise apiproxy_errors.ApplicationError(144 xmpp_service_pb.XmppServiceError.INVALID_JID)145 node = jid[:at]146 if not node:147 self.log('Invalid JID: No node. JID: %s', jid)148 raise apiproxy_errors.ApplicationError(149 xmpp_service_pb.XmppServiceError.INVALID_JID)150 rest = jid[at+1:]151 if rest.find('@') > -1:152 self.log('Invalid JID: Second \'@\' character found. JID: %s',153 jid)154 raise apiproxy_errors.ApplicationError(155 xmpp_service_pb.XmppServiceError.INVALID_JID)156 slash = rest.find('/')157 if slash == -1:158 domain = rest159 resource = 'bot'160 else:161 domain = rest[:slash]162 resource = rest[slash+1:]163 if resource.find('/') > -1:164 self.log('Invalid JID: Second \'/\' character found. JID: %s',165 jid)166 raise apiproxy_errors.ApplicationError(167 xmpp_service_pb.XmppServiceError.INVALID_JID)168 return node, domain, resource169 def _ValidateJid(self, jid):170 """Validate the given JID using self._ParseJid."""171 try:172 self._ParseJid(jid)173 return True174 except apiproxy_errors.ApplicationError:175 return False176 def _GetFrom(self, requested):177 """Validates that the from JID is valid.178 The JID uses the display-app-id for all apps to simulate a common case179 in production (alias === display-app-id).180 Args:181 requested: The requested from JID.182 Returns:183 string, The from JID.184 Raises:185 apiproxy_errors.ApplicationError if the requested JID is invalid.186 """187 full_appid = os.environ.get('APPLICATION_ID')188 partition, _, display_app_id = (189 app_identity.app_identity._ParseFullAppId(full_appid))190 if requested == None or requested == '':191 return display_app_id + '@appspot.com/bot'192 node, domain, resource = self._ParseJid(requested)193 if domain == 'appspot.com' and node == display_app_id:194 return node + '@' + domain + '/' + resource195 elif domain == display_app_id + '.appspotchat.com':196 return node + '@' + domain + '/' + resource197 self.log('Invalid From JID: Must be appid@appspot.com[/resource] or '198 'node@appid.appspotchat.com[/resource]. JID: %s', requested)199 raise apiproxy_errors.ApplicationError(200 xmpp_service_pb.XmppServiceError.INVALID_JID)201 def _Dynamic_CreateChannel(self, request, response):202 """Implementation of XmppService::CreateChannel.203 Args:204 request: A CreateChannelRequest.205 response: A CreateChannelResponse.206 """207 log_message = []208 log_message.append('Sending a Create Channel:')209 log_message.append(' Client ID:')210 log_message.append(' ' + request.application_key())211 if request.duration_minutes():212 log_message.append(' Duration minutes: ' +213 str(request.duration_minutes()))214 self.log('\n'.join(log_message))215 def _Dynamic_SendChannelMessage(self, request, response):216 """Implementation of XmppService::SendChannelMessage.217 Args:218 request: A SendMessageRequest.219 response: A SendMessageRequest.220 """221 log_message = []222 log_message.append('Sending a Channel Message:')223 log_message.append(' Client ID:')224 log_message.append(' ' + request.application_key())225 log_message.append(' Message:')226 log_message.append(' ' + str(request.message()))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!