How to use server_addr method in Playwright Python

Best Python code snippet using playwright-python

client_sim.py

Source:client_sim.py Github

copy

Full Screen

1# encoding:utf-8 2import cap_log3import logging4import requests5import json6import cfg7import random8import cookielib9import os10import login_pb211# turn on requests debug12import httplib13httplib.HTTPConnection.debuglevel = 014logger = logging.getLogger()15def ff(func, *args):16 def wrapper(*args, **kv):17 self = args[0]18 url = "%s%s" % (self.server_addr, self.urlpath.get(func.__name__))19 req_data = kv.get('data')20 res = self.session.post(url, data={'data':json.dumps(req_data)})21 logger.debug("%s" % (res.text))22 try:23 jr = res.json()24 if (jr.get('code') == 200):25 func(*args, **kv)26 else:27 logger.warn("list server failed: %s" % jr)28 except Exception, e:29 logger.warn("fail to %s %s : %s" % (func.__name__, e, res.text))30 pass31 return wrapper32class CapClient(object):33 urlpath = {34 'listserver': '/server/list',35 'register': '/account/register',36 'login': '/account/login',37 'logout': '/account/logout',38 'create_role': '/account/createrole',39 'acctquery': '/account/query',40 'join_arena': '/arena/join',41 'query_rank': '/arena/query',42 'find_opponents': '/arena/search',43 'arena_all': '/arena/general',44 'challenge': '/arena/challenge',45 'challengeresult': '/arena/challengeresult',46 'challenge_history': '/arena/history',47 'gen_name': '/name/generate',48 'robable_items': '/rob/itemlist',49 'find_rob_opponent': '/rob/search',50 'rob_attack': '/rob/attack',51 'rob_commit': '/rob/commit',52 'rob_history': '/rob/history',53 'user_info': '/user/info',54 'user_overview': '/user/overview',55 'mission_prepare': '/mission/prepare',56 'mission_attack': '/mission/attack',57 'mission_commit': '/mission/commit',58 'mission_wipe': '/mission/wipe',59 'history_sections': '/mission/sections',60 'mission_tlrecord': '/mission/tlrecord',61 'buy': '/store/buy',62 'purchase_records': '/store/records',63 'drgbl_list': '/dragonBall/list',64 'drgbl_compose': '/dragonBall/compose',65 'friend_add': '/friend/reqadd',66 'friend_cancel': '/friend/reqcancel',67 'friend_reqop': '/friend/reqop',68 'friend_del': '/friend/del',69 'friend_list': '/friend/list',70 'friend_giveenergy': '/friend/giveenergy',71 'friend_acceptenergy': '/friend/acceptenergy',72 'msgpush_broadcast': '/msgpush/broadcast',73 'msgpush_send': '/msgpush/send',74 'msgpush_query': '/msgpush/query',75 'tech_addexp': '/tech/addexp',76 'tech_list': '/tech/list',77 'mail_list': '/mail/list',78 'mail_accept': '/mail/accept',79 'mail_send': '/mail/send',80 'sysmail_send': '/sys/sendmail',81 'cardraffle_query': '/cardraffle/query',82 'cardraffle_raffle': '/cardraffle/raffle',83 'loginbonus_list': '/loginbonus/list',84 'loginbonus_accept': '/loginbonus/accept',85 'loginbonus_remedy': '/loginbonus/remedy',86 'loginbonus_accuaccept': '/loginbonus/accuaccept',87 'newbie_commit': '/newbie/commit',88 'midas_query': '/midas/query',89 'midas_buy': '/midas/buy',90 'refreshstore_refresh': '/refreshstore/refresh',91 'refreshstore_buy': '/refreshstore/buy',92 'refreshstore_list': '/refreshstore/list',93 'vip_charge': '/vip/charge',94 'guildmission_summon': '/guildmission/summon',95 'guildmission_attack': '/guildmission/attack',96 'guildmission_commit': '/guildmission/commit',97 'guildmission_query': '/guildmission/query',98 'user_setfield': '/user/setfield',99 'campaign_query': '/campaign/query',100 'campaign_accept': '/campaign/accept',101 }102 103 def __init__(self, acct, password):104 self.acct = acct # email105 self.password = password106 self.session = requests.Session()107 self.role_id = None108 self.server_addr = None109 self.servers = {}110 self.status = 0 # 1 online111 self.role = None112 self.opponents = {}113 self.rob_opponents = {}114 self.cookie_filename = "cookie_%s.ck" % (self.acct)115 self.cookies = {}116 self.load_cookie()117 def save_cookie(self, cookies):118 dcookie = requests.utils.dict_from_cookiejar(cookies)119 self.cookies = dcookie120 content = json.dumps(dcookie)121 logger.info("save cookie %s to %s" % (content, self.cookie_filename))122 with open(self.cookie_filename, "w+") as fp:123 fp.write(content)124 def load_cookie(self):125 logger.info("loadcookie...")126 if not os.path.exists(self.cookie_filename):127 return128 with open(self.cookie_filename, "r+") as fp:129 content = fp.read()130 if content:131 dcookie = json.loads(content)132 if dcookie.get('PHPSESSID'):133 self.status = 1134 self.cookies = dcookie135 logger.info("cookie: %s", self.cookies)136 def post(self, url, *args, **kv):137 res = self.session.post(url, cookies=self.cookies, *args, **kv)138 jr = {}139 logger.debug(res.text)140 try:141 jr = res.json()142 except Exception, e:143 logger.warn("fail to get %s : %s %s[%s]" % (url, e, res.status_code, res.text))144 return jr145 def list_server(self):146 url = "%s%s" % (cfg.entry_addr, self.urlpath.get('listserver'))147 res = self.session.get(url)148 try:149 jr = res.json()150 if (jr.get('code') == 200):151 for server in jr.get('servers', []):152 self.servers[int(server.get('id'))] = server.get('addr')153 logger.info("got servers %s" % (self.servers))154 else:155 logger.warn("list server failed: %s" % jr)156 except Exception, e:157 logger.warn("fail to list server %s : %s" % (e, res.text))158 pass159 def select_server(self, sid):160 if not self.servers:161 self.list_server()162 self.server_addr = self.servers.get(sid)163 if self.server_addr and not self.server_addr.startswith('http'):164 self.server_addr = "http://%s" % (self.server_addr)165 logger.debug("select server addr: %s" % (self.server_addr))166 def register(self):167 url = "%s%s" % (self.server_addr, self.urlpath.get('register'))168 req_data = {169 'email': self.acct,170 'password': self.password,171 }172 res = self.post(url, data={'data':json.dumps(req_data)})173 def login(self):174 print "login account %s" % (self.acct)175 url = "%s%s" % (self.server_addr, self.urlpath.get('login'))176 #req_data = {177 # 'email': self.acct,178 # 'password': self.password,179 #}180 #res = self.session.post(url, data={'data':json.dumps(req_data)})181 data = login_pb2.msg_login()182 data.username = self.acct183 data.password = self.password 184 res = self.session.post(url, data={'data':data.SerializeToString()})185 logger.debug("%s" % (res.text))186 jr = res.json()187 logger.info("%s" % res.cookies)188 self.save_cookie(res.cookies)189 if jr.get('code', 0) == 200:190 self.status = 1191 users = jr.get('users', [])192 if users:193 self.role = users[0]194 logger.debug('logged in with role %s', self.role.get('id', -1))195 else:196 logger.debug('logged in without role')197 def create_role(self, name):198 url = "%s%s" % (self.server_addr, self.urlpath.get('create_role'))199 req_data = {200 'name': name,201 }202 jr = self.post(url, data={'data':json.dumps(req_data)})203 def join_arena(self):204 url = "%s%s" % (self.server_addr, self.urlpath.get('join_arena'))205 req_data = {206 }207 jr = self.post(url, data={'data':json.dumps(req_data)})208 if (jr.get('code') == 200):209 self.rank = jr.get('rank', -1)210 else:211 logger.warn("failed %s" % (url))212 def query_rank(self):213 url = "%s%s" % (self.server_addr, self.urlpath.get('query_rank'))214 req_data = {215 }216 jr = self.post(url, data={'data':json.dumps(req_data)})217 if (jr.get('code') == 200):218 pass219 else:220 logger.warn("failed %s" % (url))221 def find_opponents(self):222 url = "%s%s" % (self.server_addr, self.urlpath.get('find_opponents'))223 req_data = {224 }225 jr = self.post(url, data={'data':json.dumps(req_data)})226 if (jr.get('code') == 200):227 self.opponents = jr.get('users')228 else:229 logger.warn("failed %s" % (url))230 def arena_all(self):231 url = "%s%s" % (self.server_addr, self.urlpath.get('arena_all'))232 req_data = {233 'opponents': 1,234 'rank': 1,235 'best_rank': 1,236 'new_award': 1,237 'top_ranks': 1,238 'deck': 1,239 'energy': 1,240 }241 jr = self.post(url, data={'data':json.dumps(req_data)})242 if (jr.get('code') == 200):243 self.opponents = jr.get('opponents')244 else:245 logger.warn("failed %s" % (url))246 def challenge(self, uid=None):247 url = "%s%s" % (self.server_addr, self.urlpath.get('challenge'))248 if not uid:249 if not self.opponents:250 logger.warn("no opponents to challenge")251 return252 user = random.choice(self.opponents)253 uid = user['uid']254 logger.info("challenge user %s" % (uid))255 req_data = {256 'opponent': uid,257 'get_deck': 1258 }259 jr = self.post(url, data={'data':json.dumps(req_data)})260 if (jr.get('code') == 200):261 return jr.get('challenge_id')262 else:263 logger.warn("failed %s" % (url))264 return None265 def challengeresult(self, cid, r):266 url = "%s%s" % (self.server_addr, self.urlpath.get('challengeresult'))267 logger.debug("challenge %s commited with %s" % (cid, r))268 req_data = {269 'challenge_id': cid,270 'result': r,271 'info': "nothing to said",272 }273 jr = self.post(url, data={'data':json.dumps(req_data)})274 if (jr.get('code') == 200):275 self.rank = jr.get('rank')276 else:277 logger.warn("failed %s" % (url))278 def challenge_history(self):279 url = "%s%s" % (self.server_addr, self.urlpath.get('challenge_history'))280 req_data = {281 }282 jr = self.post(url, data={'data':json.dumps(req_data)})283 if (jr.get('code') == 200):284 pass285 else:286 logger.warn("failed %s" % (url))287 def gen_name(self):288 url = "%s%s" % (self.server_addr, self.urlpath.get('gen_name'))289 req_data = {290 }291 jr = self.post(url, data={'data':json.dumps(req_data)})292 if (jr.get('code') == 200):293 pass294 else:295 logger.warn("failed %s" % (url))296 def robable_items(self):297 url = "%s%s" % (self.server_addr, self.urlpath.get('robable_items'))298 req_data = {299 }300 jr = self.post(url, data={'data':json.dumps(req_data)})301 if (jr.get('code') == 200):302 pass303 else:304 logger.warn("failed %s" % (url))305 def find_rob_opponent(self, item=None):306 url = "%s%s" % (self.server_addr, self.urlpath.get('find_rob_opponent'))307 req_data = {308 'item': item,309 'protect_status': 1310 }311 jr = self.post(url, data={'data':json.dumps(req_data)})312 if (jr.get('code') == 200):313 pass314 else:315 logger.warn("failed %s" % (url))316 def rob_attack(self, user=None, item={}):317 url = "%s%s" % (self.server_addr, self.urlpath.get('rob_attack'))318 if not user:319 if not self.rob_opponents:320 logger.warn("no opponents to rob")321 return322 user = random.choice(self.rob_opponents.keys())323 if self.opponents.get(user):324 del(self.opponents[user])325 req_data = {326 'opponent': user,327 'item': item,328 }329 jr = self.post(url, data={'data':json.dumps(req_data)})330 if (jr.get('code') == 200):331 return jr.get('rob_id')332 else:333 logger.warn("failed %s" % (url))334 return None335 def rob_commit(self, rob_id, result):336 url = "%s%s" % (self.server_addr, self.urlpath.get('rob_commit'))337 req_data = {338 'rob_id': rob_id,339 'result': result,340 'info': 'robbery',341 }342 jr = self.post(url, data={'data':json.dumps(req_data)})343 if (jr.get('code') == 200):344 pass345 else:346 logger.warn("failed %s" % (url))347 def rob_history(self):348 url = "%s%s" % (self.server_addr, self.urlpath.get('rob_history'))349 req_data = {350 }351 jr = self.post(url, data={'data':json.dumps(req_data)})352 if (jr.get('code') == 200):353 pass354 else:355 logger.warn("failed %s" % (url))356 def user_info(self, uids=None, attrs=None):357 url = "%s%s" % (self.server_addr, self.urlpath.get('user_info'))358 req_data = {359 }360 if (uids):361 req_data['users'] = uids362 if (attrs):363 req_data['attrs'] = attrs364 jr = self.post(url, data={'data':json.dumps(req_data)})365 if (jr.get('code') == 200):366 pass367 else:368 logger.warn("failed %s" % (url))369 def user_overview(self, uids=None):370 url = "%s%s" % (self.server_addr, self.urlpath.get('user_overview'))371 req_data = {372 'users': uids373 }374 if (uids):375 req_data['users'] = uids376 jr = self.post(url, data={'data':json.dumps(req_data)})377 if (jr.get('code') == 200):378 pass379 else:380 logger.warn("failed %s" % (url))381 def mission_prepare(self, mtype=0):382 url = "%s%s" % (self.server_addr, self.urlpath.get('mission_prepare'))383 req_data = {384 'type': mtype385 }386 jr = self.post(url, data={'data':json.dumps(req_data)})387 if (jr.get('code') == 200):388 pass389 else:390 logger.warn("failed %s" % (url))391 def mission_attack(self, section, mtype=0, mercenary=None):392 url = "%s%s" % (self.server_addr, self.urlpath.get('mission_attack'))393 req_data = {394 'section_id': section,395 'mercenary': mercenary,396 'type': mtype397 }398 jr = self.post(url, data={'data':json.dumps(req_data)})399 if (jr.get('code') == 200):400 return jr.get('battle_id')401 else:402 logger.warn("failed %s" % (url))403 def mission_commit(self, battle_id, result, mtype=0, deaths=0):404 url = "%s%s" % (self.server_addr, self.urlpath.get('mission_commit'))405 req_data = {406 'battle_id': battle_id,407 'result': result,408 'deaths': deaths,409 'type': mtype410 }411 jr = self.post(url, data={'data':json.dumps(req_data)})412 if (jr.get('code') == 200):413 pass414 else:415 logger.warn("failed %s" % (url))416 def mission_wipe(self, section, mtype=0, count=1, free=0):417 url = "%s%s" % (self.server_addr, self.urlpath.get('mission_wipe'))418 req_data = {419 'section_id': section,420 'count': count,421 'free': free,422 'type': mtype423 }424 jr = self.post(url, data={'data':json.dumps(req_data)})425 if (jr.get('code') == 200):426 pass427 else:428 logger.warn("failed %s" % (url))429 def history_sections(self, mtype=0):430 url = "%s%s" % (self.server_addr, self.urlpath.get('history_sections'))431 req_data = {432 'type': mtype433 }434 jr = self.post(url, data={'data':json.dumps(req_data)})435 if (jr.get('code') == 200):436 pass437 else:438 logger.warn("failed %s" % (url))439 def buy(self, commodity_id=1, count=1, store_id=1, ):440 url = "%s%s" % (self.server_addr, self.urlpath.get('buy'))441 req_data = {442 'store_id': store_id,443 'commodity_id': commodity_id,444 'count': count445 }446 jr = self.post(url, data={'data':json.dumps(req_data)})447 if (jr.get('code') == 200):448 pass449 else:450 logger.warn("failed %s" % (url))451 def purchase_records(self):452 url = "%s%s" % (self.server_addr, self.urlpath.get('purchase_records'))453 req_data = {454 }455 jr = self.post(url, data={'data':json.dumps(req_data)})456 if (jr.get('code') == 200):457 pass458 else:459 logger.warn("failed %s" % (url))460 def drgbl_compose(self, ball_id):461 url = "%s%s" % (self.server_addr, self.urlpath.get('drgbl_compose'))462 req_data = {463 'db_id': ball_id,464 }465 jr = self.post(url, data={'data':json.dumps(req_data)})466 if (jr.get('code') == 200):467 pass468 else:469 logger.warn("failed %s" % (url))470 def drgbl_list(self):471 url = "%s%s" % (self.server_addr, self.urlpath.get('drgbl_list'))472 req_data = {473 'fragments': 1,474 'balls': 1,475 }476 jr = self.post(url, data={'data':json.dumps(req_data)})477 if (jr.get('code') == 200):478 pass479 else:480 logger.warn("failed %s" % (url))481 def friend_add(self, target):482 url = "%s%s" % (self.server_addr, self.urlpath.get('friend_add'))483 req_data = {484 'uid': target,485 }486 jr = self.post(url, data={'data':json.dumps(req_data)})487 if (jr.get('code') == 200):488 pass489 else:490 logger.warn("failed %s" % (url))491 def friend_cancel(self, target):492 url = "%s%s" % (self.server_addr, self.urlpath.get('friend_cancel'))493 req_data = {494 'uid': target,495 }496 jr = self.post(url, data={'data':json.dumps(req_data)})497 if (jr.get('code') == 200):498 pass499 else:500 logger.warn("failed %s" % (url))501 def friend_reqop(self, target, approve):502 url = "%s%s" % (self.server_addr, self.urlpath.get('friend_reqop'))503 req_data = {504 'uid': target,505 'approve': approve506 }507 jr = self.post(url, data={'data':json.dumps(req_data)})508 if (jr.get('code') == 200):509 pass510 else:511 logger.warn("failed %s" % (url))512 def friend_del(self, target):513 url = "%s%s" % (self.server_addr, self.urlpath.get('friend_del'))514 req_data = {515 'uid': target,516 }517 jr = self.post(url, data={'data':json.dumps(req_data)})518 if (jr.get('code') == 200):519 pass520 else:521 logger.warn("failed %s" % (url))522 def friend_giveenergy(self, target):523 url = "%s%s" % (self.server_addr, self.urlpath.get('friend_giveenergy'))524 req_data = {525 'uid': target526 }527 jr = self.post(url, data={'data':json.dumps(req_data)})528 if (jr.get('code') == 200):529 pass530 else:531 logger.warn("failed %s" % (url))532 def friend_acceptenergy(self, target):533 url = "%s%s" % (self.server_addr, self.urlpath.get('friend_acceptenergy'))534 req_data = {535 'uid': target536 }537 jr = self.post(url, data={'data':json.dumps(req_data)})538 if (jr.get('code') == 200):539 pass540 else:541 logger.warn("failed %s" % (url))542 def msgpush_broadcast(self, msg):543 url = "%s%s" % (self.server_addr, self.urlpath.get('msgpush_broadcast'))544 req_data = {545 'msg': msg546 }547 jr = self.post(url, data={'data':json.dumps(req_data)})548 if (jr.get('code') == 200):549 pass550 else:551 logger.warn("failed %s" % (url))552 def friend_list(self):553 url = "%s%s" % (self.server_addr, self.urlpath.get('friend_list'))554 req_data = {555 }556 jr = self.post(url, data={'data':json.dumps(req_data)})557 if (jr.get('code') == 200):558 pass559 else:560 logger.warn("failed %s" % (url))561 def tech_list(self):562 url = "%s%s" % (self.server_addr, self.urlpath.get('tech_list'))563 req_data = {564 }565 jr = self.post(url, data={'data':json.dumps(req_data)})566 if (jr.get('code') == 200):567 pass568 else:569 logger.warn("failed %s" % (url))570 def tech_addexp(self, tech_id):571 url = "%s%s" % (self.server_addr, self.urlpath.get('tech_addexp'))572 req_data = {573 'tech_id': tech_id,574 'cost_cards':[],575 'cost_equips':[],576 'cost_card_sprits':{}577 }578 jr = self.post(url, data={'data':json.dumps(req_data)})579 if (jr.get('code') == 200):580 pass581 else:582 logger.warn("failed %s" % (url))583 def mail_list(self):584 url = "%s%s" % (self.server_addr, self.urlpath.get('mail_list'))585 req_data = {586 }587 jr = self.post(url, data={'data':json.dumps(req_data)})588 if (jr.get('code') == 200):589 pass590 else:591 logger.warn("failed %s" % (url))592 def mail_accept(self, mail_id):593 url = "%s%s" % (self.server_addr, self.urlpath.get('mail_accept'))594 req_data = {595 'mail_id': mail_id596 }597 jr = self.post(url, data={'data':json.dumps(req_data)})598 if (jr.get('code') == 200):599 pass600 else:601 logger.warn("failed %s" % (url))602 def mail_send(self, text, attachment, targets):603 url = "%s%s" % (self.server_addr, self.urlpath.get('mail_send'))604 req_data = {605 'mail_text': text,606 'attachment': attachment,607 'targets': targets608 }609 jr = self.post(url, data={'data':json.dumps(req_data)})610 if (jr.get('code') == 200):611 pass612 else:613 logger.warn("failed %s" % (url))614 def sysmail_send(self, text, attachment, targets, from_name):615 url = "%s%s" % (self.server_addr, self.urlpath.get('sysmail_send'))616 req_data = {617 'mail_text': text,618 'attachment': attachment,619 'targets': targets,620 'from_name': from_name621 }622 jr = self.post(url, data={'data':json.dumps(req_data)})623 if (jr.get('code') == 200):624 pass625 else:626 logger.warn("failed %s" % (url))627 def cardraffle_query(self):628 url = "%s%s" % (self.server_addr, self.urlpath.get('cardraffle_query'))629 req_data = {630 }631 jr = self.post(url, data={'data':json.dumps(req_data)})632 if (jr.get('code') == 200):633 pass634 else:635 logger.warn("failed %s" % (url))636 def cardraffle_raffle(self, rtype, raffle_type):637 url = "%s%s" % (self.server_addr, self.urlpath.get('cardraffle_raffle'))638 req_data = {639 'type': rtype,640 'raffle_type': raffle_type641 }642 jr = self.post(url, data={'data':json.dumps(req_data)})643 if (jr.get('code') == 200):644 pass645 else:646 logger.warn("failed %s" % (url))647 def loginbonus_list(self):648 url = "%s%s" % (self.server_addr, self.urlpath.get('loginbonus_list'))649 req_data = {650 }651 jr = self.post(url, data={'data':json.dumps(req_data)})652 if (jr.get('code') == 200):653 pass654 else:655 logger.warn("failed %s" % (url))656 def loginbonus_accept(self):657 url = "%s%s" % (self.server_addr, self.urlpath.get('loginbonus_accept'))658 req_data = {659 }660 jr = self.post(url, data={'data':json.dumps(req_data)})661 if (jr.get('code') == 200):662 pass663 else:664 logger.warn("failed %s" % (url))665 def loginbonus_remedy(self, date):666 url = "%s%s" % (self.server_addr, self.urlpath.get('loginbonus_remedy'))667 req_data = {668 'date': date669 }670 jr = self.post(url, data={'data':json.dumps(req_data)})671 if (jr.get('code') == 200):672 pass673 else:674 logger.warn("failed %s" % (url))675 def loginbonus_accuaccept(self, accu):676 url = "%s%s" % (self.server_addr, self.urlpath.get('loginbonus_accuaccept'))677 req_data = {678 'accu_value': accu679 }680 jr = self.post(url, data={'data':json.dumps(req_data)})681 if (jr.get('code') == 200):682 pass683 else:684 logger.warn("failed %s" % (url))685 def newbie_commit(self, step):686 url = "%s%s" % (self.server_addr, self.urlpath.get('newbie_commit'))687 req_data = {688 'step': step689 }690 jr = self.post(url, data={'data':json.dumps(req_data)})691 if (jr.get('code') == 200):692 pass693 else:694 logger.warn("failed %s" % (url))695 def midas_query(self):696 url = "%s%s" % (self.server_addr, self.urlpath.get('midas_query'))697 req_data = {698 }699 jr = self.post(url, data={'data':json.dumps(req_data)})700 if (jr.get('code') == 200):701 pass702 else:703 logger.warn("failed %s" % (url))704 def midas_buy(self):705 url = "%s%s" % (self.server_addr, self.urlpath.get('midas_buy'))706 req_data = {707 }708 jr = self.post(url, data={'data':json.dumps(req_data)})709 if (jr.get('code') == 200):710 pass711 else:712 logger.warn("failed %s" % (url))713 def refreshstore_refresh(self):714 url = "%s%s" % (self.server_addr, self.urlpath.get('refreshstore_refresh'))715 req_data = {716 }717 jr = self.post(url, data={'data':json.dumps(req_data)})718 if (jr.get('code') == 200):719 pass720 else:721 logger.warn("failed %s" % (url))722 def refreshstore_buy(self):723 url = "%s%s" % (self.server_addr, self.urlpath.get('refreshstore_buy'))724 req_data = {725 'commodity_id': 1726 }727 jr = self.post(url, data={'data':json.dumps(req_data)})728 if (jr.get('code') == 200):729 pass730 else:731 logger.warn("failed %s" % (url))732 def refreshstore_list(self):733 url = "%s%s" % (self.server_addr, self.urlpath.get('refreshstore_list'))734 req_data = {735 }736 jr = self.post(url, data={'data':json.dumps(req_data)})737 if (jr.get('code') == 200):738 pass739 else:740 logger.warn("failed %s" % (url))741 def vip_charge(self, pkg_id):742 url = "%s%s" % (self.server_addr, self.urlpath.get('vip_charge'))743 req_data = {744 'package_id': pkg_id745 }746 jr = self.post(url, data={'data':json.dumps(req_data)})747 if (jr.get('code') == 200):748 pass749 else:750 logger.warn("failed %s" % (url))751 def mission_tlrecord(self):752 url = "%s%s" % (self.server_addr, self.urlpath.get('mission_tlrecord'))753 req_data = {754 }755 jr = self.post(url, data={'data':json.dumps(req_data)})756 if (jr.get('code') == 200):757 pass758 else:759 logger.warn("failed %s" % (url))760 def guildmission_summon(self, boss_id):761 url = "%s%s" % (self.server_addr, self.urlpath.get('guildmission_summon'))762 req_data = {763 'boss_id': boss_id,764 }765 jr = self.post(url, data={'data':json.dumps(req_data)})766 if (jr.get('code') == 200):767 return jr.get('boss')768 else:769 logger.warn("failed %s" % (url))770 def guildmission_query(self):771 url = "%s%s" % (self.server_addr, self.urlpath.get('guildmission_query'))772 req_data = {773 }774 jr = self.post(url, data={'data':json.dumps(req_data)})775 if (jr.get('code') == 200):776 return jr.get('bosses')777 else:778 logger.warn("failed %s" % (url))779 def guildmission_attack(self, boss_uid):780 url = "%s%s" % (self.server_addr, self.urlpath.get('guildmission_attack'))781 req_data = {782 'boss_uid': boss_uid,783 }784 jr = self.post(url, data={'data':json.dumps(req_data)})785 if (jr.get('code') == 200):786 return jr.get('battle_id')787 else:788 logger.warn("failed %s" % (url))789 def guildmission_commit(self, battle_id, damage=30):790 url = "%s%s" % (self.server_addr, self.urlpath.get('guildmission_commit'))791 req_data = {792 'battle_id': battle_id,793 'damage': damage,794 }795 jr = self.post(url, data={'data':json.dumps(req_data)})796 if (jr.get('code') == 200):797 return jr.get('boss')798 else:799 logger.warn("failed %s" % (url))800 def user_setfield(self, key, value):801 url = "%s%s" % (self.server_addr, self.urlpath.get('user_setfield'))802 req_data = {803 'field': key,804 'value': 1,805 }806 jr = self.post(url, data={'data':json.dumps(req_data)})807 if (jr.get('code') == 200):808 pass809 else:810 logger.warn("failed %s" % (url))811 def msgpush_send(self, to, text, type):812 url = "%s%s" % (self.server_addr, self.urlpath.get('msgpush_send'))813 req_data = {814 'text': text,815 'to': to,816 'type': type,817 }818 jr = self.post(url, data={'data':json.dumps(req_data)})819 if (jr.get('code') == 200):820 pass821 else:822 logger.warn("failed %s" % (url))823 def msgpush_query(self, since):824 url = "%s%s" % (self.server_addr, self.urlpath.get('msgpush_query'))825 req_data = {826 'since':since827 }828 jr = self.post(url, data={'data':json.dumps(req_data)})829 if (jr.get('code') == 200):830 pass831 else:832 logger.warn("failed %s" % (url))833 def campaign_query(self):834 url = "%s%s" % (self.server_addr, self.urlpath.get('campaign_query'))835 req_data = {836 }837 jr = self.post(url, data={'data':json.dumps(req_data)})838 if (jr.get('code') == 200):839 pass840 else:841 logger.warn("failed %s" % (url))842 def campaign_accept(self, campaign_id, award_id):843 url = "%s%s" % (self.server_addr, self.urlpath.get('campaign_accept'))844 req_data = {845 'campaign_id': campaign_id,846 'award_id': award_id,847 }848 jr = self.post(url, data={'data':json.dumps(req_data)})849 if (jr.get('code') == 200):850 pass851 else:852 logger.warn("failed %s" % (url))853 def func(self):854 url = "%s%s" % (self.server_addr, self.urlpath.get(''))855 req_data = {856 }857 jr = self.post(url, data={'data':json.dumps(req_data)})858 if (jr.get('code') == 200):859 pass860 else:861 logger.warn("failed %s" % (url))862def ensure_online(func, *args):863 def wrapper(*args, **kv):864 acct = args865 c = CapClient('chet@capricorn.com', '123456')866 c.select_server(cfg.server_id)867 c.register()868 if 1 or not c.status:869 c.login()870 if not c.role:871 c.create_role()872 if c.status==1:873 func(c, *args, **kv)874 return wrapper875def inject_user():876 i = 0877 while i<10:878 i += 1879 name = 'chet%d' % (i)880 acct = '%s@capricorn.com' % (name)881 c = CapClient(acct, '123456')882 c.select_server(cfg.server_id)883 c.register()884 c.login()885 c.create_role(name)886 logger.info("injected user %s" % (name))887def test_arena(c):888 #c.join_arena()889 #c.query_rank()890 #c.find_opponents()891 c.arena_all()892 cid = c.challenge()893 logger.info("challenge battle %s" % cid)894 if cid:895 c.challengeresult(cid, random.randint(1,1))896 #c.challenge_history()897def prepare_user(tid, name=False, acct=False):898 if not name:899 name = 'chet%d' % (tid)900 if not acct:901 acct = '%s@capricorn.com' % (name)902 c = CapClient(acct, '123456')903 c.select_server(cfg.server_id)904 c.register()905 if not c.status:906 c.login()907 if not c.role:908 logger.info("creating user %s" % (name))909 c.create_role(name)910 return c911def test_rob(c):912 #c.robable_items()913 c.find_rob_opponent({'item_id': 7, 'sub_id': 29,})914 rid = c.rob_attack(385, {'item_id':7, 'sub_id':29})915 logger.info("rob battle %s" % rid)916 if rid:917 pass918 c.rob_commit(rid, 1)919 c.rob_history()920def test_mission(c):921 #c.user_info((1,167,3))922 #c.user_overview((1,219,3))923 c.mission_prepare(2)924 btl_id = c.mission_attack(1, 2)925 if btl_id:926 pass927 c.mission_commit(btl_id, 1, 0, 2)928 #c.mission_wipe(1, 3, free=random.randint(0,0))929 #c.history_sections(2)930 c.mission_tlrecord()931def test_store(c):932 c.buy(1, 2, 2)933 c.buy(2, 1, 2)934 c.buy(2, 2, 3)935 c.buy(2, 1, 4)936 #c.purchase_records()937def test_drgbl(c):938 c.drgbl_list()939 c.drgbl_compose(1)940def test_friend(c1, c2):941 uid1 = c1.role.get('id', None)942 uid2 = c2.role.get('id', None)943 if not uid1 or not uid2:944 logger.warn("user not ready")945 return946 c1.friend_add(uid2)947 if random.randint(0,1):948 c2.friend_reqop(uid1, 1)949 else:950 c2.friend_reqop(uid1, 0)951 c1.friend_list()952 c2.friend_list()953 #c1.friend_del(uid2)954 c1.friend_giveenergy(uid2)955 c2.friend_acceptenergy(uid1)956def test_msg(c):957 #c.msgpush_broadcast('hello world')958 c.msgpush_send(516, 'hello world', 3)959 import time960 c.msgpush_query(None)961def test_tech(c):962 c.tech_list()963 c.tech_addexp(1)964def test_mail(c):965 c.mail_list()966 c.sysmail_send('greeting\nhello', [{'item_id':1, 'sub_id':1, 'count':10}], [392], 'adm');967 #c.mail_send('hello', [{'item_id':1, 'sub_id':1, 'count':10}], [0]);968 c.mail_accept(4070)969def test_cardraffle(c):970 c.cardraffle_query()971 c.cardraffle_raffle(2, 2)972def test_loginbonus(c):973 c.loginbonus_list()974 c.loginbonus_accept()975 c.loginbonus_remedy('2014-05-01')976 c.loginbonus_accuaccept(7)977def test_newguide(c):978 c.user_info()979 step = 1980 while True:981 c.newbie_commit(step)982 step += 1983 if step > 18:984 break985def test_midas(c):986 c.midas_query()987 c.midas_buy()988def test_refresh_store(c):989 c.refreshstore_refresh()990 c.refreshstore_buy()991 c.refreshstore_list()992def test_charge(c):993 c.vip_charge(1)994def test_guildboss(c):995 bosses = c.guildmission_query()996 boss_id = "2"997 boss = bosses.get(boss_id, None)998 if boss is None or len(boss)==0 or int(boss['hp']) <= 0:999 logger.info("summon new boss")1000 boss = c.guildmission_summon(boss_id)1001 if len(boss)==0:1002 logger.warn('fail to summon')1003 return1004 battle_id = c.guildmission_attack(boss['boss_uid'])1005 logger.info("battle_id:%s" % (battle_id))1006 if battle_id:1007 boss = c.guildmission_commit(battle_id, 50)1008 logger.info("hp:%s",boss['hp'])1009def test_campaign(c):1010 c.campaign_query()1011 c.campaign_accept(1, 3)1012if __name__ == "__main__":1013 c = prepare_user(0, '5', 'K4')1014 #c.user_setfield('ng7', 10)1015 #c = prepare_user(26)1016 #test_mission(c)1017 #test_rob(c)1018 #test_arena(c)1019 #test_store(c)1020 #test_drgbl(c)1021 #inject_user()1022 #c1 = prepare_user(0, 'ccc', 'c@c.com')1023 #test_friend(c, c1)1024 #test_msg(c)1025 #test_tech(c)1026 #test_mail(c)1027 test_cardraffle(c)1028 #test_loginbonus(c)1029 #test_newguide(c)1030 #test_midas(c)1031 #test_charge(c)1032 #test_refresh_store(c)1033 #test_guildboss(c)...

Full Screen

Full Screen

commands.py

Source:commands.py Github

copy

Full Screen

1# coding: utf82from __future__ import unicode_literals3try:4 reload # Python 2.75except NameError:6 try:7 from importlib import reload # Python 3.4+8 except ImportError:9 from imp import reload # Python 3.0 - 3.310import random11import time12import TagaBot13from utils import *14from requests import get15from config import config16num_genrator = random.Random()17num_genrator.seed()18color = 219def die(pseudo, message, msg_type, sock, channel):20 if pseudo == config.admin or msg_type == "STDIN":21 print_message("[!] Master say I'm DEAD")22 print_message("Ok master", msg_type, sock, pseudo, channel)23 try:24 sock.send("QUIT :suis les ordres\r\n")25 sock.close()26 except:27 pass28 end_other_thread()29 exit(0)30 else:31 print_message("I don't think so " + pseudo, msg_type, sock, pseudo, channel)32def transfert_message_from_other_place(message, sock):33 global color34 param = message.content.split()35 if len(param) >= 3:36 addr = param[1]37 server_addr = addr.split(":")38 if len(server_addr) == 1:39 server_addr = addr40 port = 666741 elif len(server_addr) == 2:42 port = int(server_addr[1])43 server_addr = server_addr[0]44 else:45 print_message("too much :", message.msg_type, sock, message.pseudo, message.target)46 return47 if check_valid_server(server_addr, param[2], port):48 print_message("sorry you can't choose this channel, I can't agree it will create a loophole!!!",49 message.msg_type,50 sock, message.pseudo, message.target)51 return52 elif len(param) == 3 or (len(param) == 4 and param[3].lower() != "publique" and param[3].lower() != "public"):53 if check_not_already_use(server_addr, param[2], port, message.pseudo):54 print_message("Transferer already exist", message.msg_type, sock, message.pseudo, message.target)55 return56 else:57 if check_not_already_use(server_addr, param[2], port, None):58 print_message("Transferer already exist", message.msg_type, sock, message.pseudo, message.target)59 return60 external_bot_name = "user_" + str(num_genrator.randint(1000, 1000 * 1000))61 print_message("[!] name of transferer user:" + external_bot_name)62 if len(param) == 3 or (len(param) == 4 and param[3].lower() != "publique" and param[3].lower() != "public"):63 transfer = Transferrer(server_addr, param[2], port, external_bot_name, sock, message.target, message.pseudo,64 couleur=color)65 else:66 transfer = Transferrer(server_addr, param[2], port, external_bot_name, sock, message.target, couleur=color)67 transfer.start()68 timeout_start = time.time() + 1069 while not transfer.started:70 if time.time() > timeout_start:71 transfer.stop()72 print_message("Transfert cannot be start in 10 seconds aborting!", message.msg_type, sock,73 message.pseudo, message.target)74 return75 elif transfer.error is not None:76 transfer.stop()77 print_message("Transfert cannot be start because of error: " + transfer.error, message.msg_type,78 sock, message.pseudo, message.target)79 return80 color += 181 if color > 15:82 color = 283 print_message("[!] Transferring data from " + addr + param[2] + " started")84 transferrer_list.append(transfer)85 print_message("Transfert start", message.msg_type, sock, message.pseudo, message.target)86def check_not_already_use(server_addr, channel, external_port, target=None, list_to_check=transferrer_list):87 for tr in list_to_check:88 if config.debug:89 print_message(90 "[D] {} {} {} {} {} {} {} {}".format(server_addr, channel, external_port, tr.server, tr.channel,91 tr.port,92 tr.pseudo, target))93 print_message("[D] {}".format(tr.pseudo == target))94 if check_valid_server(server_addr, channel, external_port, tr.server, tr.channel,95 tr.port) and tr.pseudo == target:96 return 197def check_valid_server(server_addr, channel, external_port, comp_serv=config.main_server,98 comp_channel=config.main_channel,99 comp_port=config.main_port):100 server_addr = server_addr.lower()101 if server_addr[-1:] == ".":102 server_addr = server_addr[:-1]103 channel = channel.lower()104 try:105 external_addrs = getaddrinfo(server_addr, external_port)106 addrs = getaddrinfo(comp_serv, comp_port)107 except:108 print_message("[W] Invalid Address")109 return 1110 if config.debug:111 print_message("[D] ip address of server {}".format(addrs))112 for data in addrs:113 if len(data) >= 5:114 adresse = data[4][0]115 else:116 adresse = ""117 if config.debug:118 print_message(119 "[D] adresse of main serv '{}', adresse of external serv '{}', equal :{}".format(adresse, server_addr,120 adresse == server_addr))121 print_message("[D] channel of main serv '{}', channel of external serv '{}', equal :{}".format(comp_channel,122 channel,123 comp_channel == channel))124 if adresse == server_addr and channel == comp_channel:125 return 1126 for external_data in external_addrs:127 if len(data) >= 5:128 external_adresse = data[4][0]129 else:130 external_adresse = ""131 if config.debug:132 print_message(133 "[D] ip adresse of main serv '{}', adresse of external serv '{}', equal :{}".format(adresse,134 server_addr,135 adresse == external_adresse))136 print_message(137 "[D] channel of main serv '{}', channel of external serv '{}', equal :{}".format(comp_channel,138 channel,139 comp_channel == channel))140 if adresse == external_adresse and channel == comp_channel:141 return 1142 if config.debug:143 print_message(144 "[D] adresse of main serv '{}', adresse of external serv '{}', equal :{}".format(comp_serv, server_addr,145 comp_serv == server_addr))146 print_message(147 "[D] channel of main serv '{}', channel of external serv '{}', equal :{}".format(comp_channel, channel,148 comp_channel == channel))149 if channel == comp_channel and server_addr == comp_serv:150 return 1151def list_transferer(pseudo, message, msg_type, sock, channel):152 print_message("List of transferer:", msg_type, sock, pseudo, channel)153 for tr in transferrer_list:154 print_message("{} on {} in channel {}".format(tr.name, tr.server, tr.channel), msg_type, sock, pseudo, channel)155def suppress_transferrer(pseudo, message, msg_type, sock, channel):156 param = message.split()157 if len(param) >= 3:158 addr = param[1]159 server_addr = addr.split(":")160 if len(server_addr) == 1:161 server_addr = addr162 port = 6667163 elif len(server_addr) == 2:164 port = int(server_addr[1])165 server_addr = server_addr[0]166 else:167 print_message("too much :", msg_type, sock, pseudo, channel)168 return169 tr_stopped = False170 if len(param) >= 4:171 if param[3][:1] == "#" or param[3].lower() == "public" or param[3].lower() == "publique":172 for tr in transferrer_list:173 if tr.port == port and tr.server == server_addr and tr.channel == param[2] and tr.pseudo is None:174 tr.stop()175 tr_stopped = True176 print_message("[!] transferer " + tr.channel + " stopped")177 transferrer_list.remove(tr)178 print_message("transferer " + tr.channel + " stopped", msg_type, sock, pseudo, channel)179 if not tr_stopped:180 print_message("no transferer like this one", msg_type, sock, pseudo, channel)181 elif param[3].lower() == "private" or param[3].lower() == "priver":182 for tr in transferrer_list:183 if tr.port == port and tr.server == server_addr and tr.channel == param[2] and tr.pseudo == pseudo:184 tr.stop()185 tr_stopped = True186 print_message("[!] transferer " + tr.channel + " stopped")187 transferrer_list.remove(tr)188 print_message("transferer " + tr.channel + " stopped", msg_type, sock, pseudo, channel)189 if not tr_stopped:190 print_message("no transferer like this one", msg_type, sock, pseudo, channel)191 else:192 for tr in transferrer_list:193 if tr.port == port and tr.server == server_addr and tr.channel == param[2] and tr.pseudo == param[194 3]:195 tr.stop()196 tr_stopped = True197 print_message("[!] transferer " + tr.channel + " stopped")198 transferrer_list.remove(tr)199 print_message("transferer " + tr.channel + " stopped", msg_type, sock, pseudo, channel)200 if not tr_stopped:201 print_message("no transferer like this one", msg_type, sock, pseudo, channel)202 else:203 for tr in transferrer_list:204 if tr.port == port and tr.server == server_addr and tr.channel == param[2]:205 tr.stop()206 tr_stopped = True207 print_message("[!] transferer " + tr.channel + " stopped")208 transferrer_list.remove(tr)209 print_message("transferer " + tr.channel + " stopped", msg_type, sock, pseudo, channel)210 if not tr_stopped:211 print_message("no transferer like this one", msg_type, sock, pseudo, channel)212def start_rpg(pseudo, message, msg_type, sock, channel):213 param = message.split()214 usage = "!rpg <server|optional> <channel|optional> "215 if len(param) == 1:216 rpg_channel = "#RPG_" + str(num_genrator.randint(1000, 1000 * 1000))217 print_message("Starting RPG Game in channel : " + rpg_channel, msg_type, sock, pseudo, channel)218 rpg_game = rpg.Rpg(config.main_server, "RPG_MASTER" + str(num_genrator.randint(1000, 1000 * 1000)),219 rpg_channel,220 config.main_port)221 rpg_game.start()222 else:223 addr = param[1]224 server_addr = addr.split(":")225 if len(server_addr) == 1:226 server_addr = addr227 port = 6667228 elif len(server_addr) == 2:229 port = int(server_addr[1])230 server_addr = server_addr[0]231 else:232 print_message("too much :", msg_type, sock, pseudo, channel)233 return234 if len(param) == 2:235 rpg_channel = "#RPG_" + str(num_genrator.randint(1000, 1000 * 1000))236 rpg_game = rpg.Rpg(server_addr, "RPG_MASTER" + str(num_genrator.randint(1000, 1000 * 1000)), rpg_channel,237 port)238 rpg_game.start()239 print_message("Starting RPG Game on server" + addr + " in channel : " + rpg_channel, msg_type, sock, pseudo,240 channel)241 else:242 rpg_game = rpg.Rpg(server_addr, "RPG_MASTER" + str(num_genrator.randint(0, 1000 * 1000)), param[2], port)243 rpg_game.start()244 print_message("Starting RPG Game on server" + addr + " in channel : " + param[2], msg_type, sock, pseudo,245 channel)246 if rpg_game is not None:247 rpg_list.append(rpg_game)248def list_rpg(pseudo, message, msg_type, sock, channel):249 print_message("List of RPG:", msg_type, sock, pseudo, channel)250 for rpg in rpg_list:251 print_message("{} on {} in channel {}".format(rpg.name, rpg.server, rpg.channel), msg_type, sock, pseudo,252 channel)253def stop_rpg(pseudo, message, msg_type, sock, channel):254 usage = "!kill_rpg <server|require> <channel|require>"255 param = message.split()256 if len(param) == 3:257 addr = param[1]258 server_addr = addr.split(":")259 if len(server_addr) == 1:260 server_addr = addr261 port = 6667262 elif len(server_addr) == 2:263 port = int(server_addr[1])264 server_addr = server_addr[0]265 else:266 print_message("too much :", msg_type, sock, pseudo, channel)267 return268 tr_stopped = False269 for rpg_game in rpg_list:270 if rpg_game.port == port and rpg_game.server == server_addr and rpg_game.channel == param[2]:271 rpg_game.stop()272 tr_stopped = True273 print_message("[!] RPG " + rpg_game.channel + " stopped")274 print_message("transferer " + rpg_game.channel + " stopped", msg_type, sock, pseudo, channel)275 if not tr_stopped:276 print_message("no RPG like this one", msg_type, sock, pseudo, channel)277def rop_start(pseudo, message, msg_type, sock, channel):278 rop.RopThread(pseudo, message, msg_type, sock, channel).start()279def migration(pseudo, message, msg_type, sock, channel):280 param = message.split()281 if len(param) == 2:282 sock.send("JOIN " + param[1] + "\r\n")283 sock.send("PART " + config.main_channel + "\r\n")284 config.main_channel = param[1]285 print_message("Migration done", msg_type, sock, pseudo, channel)286def reload_bot(pseudo, message, msg_type, sock, channel):287 param = message.split()288 if len(param) == 2:289 if param[1] == "all":290 print_message("starting the reload", msg_type, sock, pseudo, channel)291 reload(rop)292 reload(rpg)293 reload(TagaBot)294 print_message("reload finished", msg_type, sock, pseudo, channel)295def start_bot(pseudo, message, msg_type, sock, channel):296 param = message.split()297 if len(param) >= 4:298 addr = param[1]299 server_addr = addr.split(":")300 if len(server_addr) == 1:301 server_addr = addr302 port = 6667303 elif len(server_addr) == 2:304 port = int(server_addr[1])305 server_addr = server_addr[0]306 else:307 print_message("too much :", msg_type, sock, pseudo, channel)308 return309 for bot in bot_list:310 if check_valid_server(server_addr, param[2], port, bot.server, bot.channel, bot.port):311 print_message("sorry you can't choose this channel, There is already one Unit of myself present in it",312 msg_type,313 sock, pseudo, channel)314 return315 bot_name = param[3]316 bot = TagaBot.Bot(bot_name=bot_name, server=server_addr, channel=param[2], port=port)317 bot.start()318 timeout_start = time.time() + 30319 while not bot.started:320 if time.time() > timeout_start:321 bot.stop()322 print_message("Bot cannot be start in 30 seconds aborting!", msg_type, sock, pseudo, channel)323 return -1324 elif bot.error is not None:325 bot.stop()326 print_message("Bot cannot be start because of error: " + bot.error, msg_type,327 sock, pseudo, channel)328 return -2329 bot_list.append(bot)330 print_message("Bot started", msg_type, sock, pseudo, channel)331 return bot332def list_bot(pseudo, message, msg_type, sock, channel):333 print_message("List of BOT:", msg_type, sock, pseudo, channel)334 for bot in bot_list:335 print_message("{} on {} in channel {}".format(bot.name, bot.server, bot.channel), msg_type, sock, pseudo,336 channel)337def stop_bot(pseudo, message, msg_type, sock, channel):338 usage = "!kill_bot <server|require> <channel|require>"339 param = message.split()340 if len(param) == 3:341 addr = param[1]342 server_addr = addr.split(":")343 if len(server_addr) == 1:344 server_addr = addr345 port = 6667346 elif len(server_addr) == 2:347 port = int(server_addr[1])348 server_addr = server_addr[0]349 else:350 print_message("too much :", msg_type, sock, pseudo, channel)351 return352 tr_stopped = False353 for bot in bot_list:354 if bot.port == port and bot.server == server_addr and bot.channel == param[2]:355 if bot.sock != sock:356 bot.stop()357 tr_stopped = True358 bot_list.remove(bot)359 print_message("[!] Bot " + bot.channel + " stopped")360 print_message("Bot " + bot.channel + " stopped", msg_type, sock, pseudo, channel)361 else:362 print_message("Eum NO YOU CAN'T KILL ME", msg_type, sock, pseudo, channel)363 return364 if not tr_stopped:365 print_message("no bot like this one", msg_type, sock, pseudo, channel)366def last_time_seen(pseudo, message, msg_type, sock, channel):367 param = message.split()368 if len(param) > 1:369 for i in range(1, len(param)):370 username = param[i]371 if username != "":372 u = USERLIST.get_user(username)373 if u != -1:374 if not u.actif:375 ret = "{} has been seen the last time on server {} in channel {} at: {}".format(username,376 u.server,377 u.channel,378 u.lastSeen)379 else:380 ret = "{} has been actif the last time on server {} in channel {} at: {}".format(username,381 u.server,382 u.channel,383 u.lastSeen)384 else:385 ret = "{} has never been seen".format(username)386 print_message(ret, msg_type, sock, pseudo, channel)387def apero_status(pseudo, message, msg_type, sock, channel):388 r = get(u"http://estcequecestbientotlapero.fr/")389 msg = parse_html_balise(u"h2", r.text)390 apero = convert_html_to_uni(parse_html_balise(u"<font size=5>", msg))391 print_message(apero, msg_type, sock, pseudo, channel)392 if "font size=3" in msg:393 conseil = convert_html_to_uni(parse_html_balise(u"<font size=3>", msg))394 print_message(conseil, msg_type, sock, pseudo, channel)395def user_list(pseudo, message, msg_type, sock, channel):396 for line in unicode(USERLIST).split("\r\n"):397 print_message(line, msg_type, sock, pseudo, channel)398 time.sleep(1)399 return 1400def spy_channel(pseudo, message, msg_type, sock, channel):401 param = message.split()402 if len(param) >= 3:403 addr = param[1]404 server_addr = addr.split(":")405 if len(server_addr) == 1:406 server_addr = addr407 port = 6667408 elif len(server_addr) == 2:409 port = int(server_addr[1])410 server_addr = server_addr[0]411 else:412 print_message("too much :", msg_type, sock, pseudo, channel)413 return414 if check_valid_server(server_addr, param[2], port):415 print_message("sorry you can't choose this channel, I am already doing the job of spy",416 msg_type,417 sock, pseudo, channel)418 return419 else:420 if check_not_already_use(server_addr, param[2], port, None, list_to_check=eyes):421 print_message("Spy already exist", msg_type, sock, pseudo, channel)422 return423 external_bot_name = "user_" + str(num_genrator.randint(1000, 1000 * 1000))424 print_message("[!] name of Spy user:" + external_bot_name)425 spy = IRC(server_addr, param[2], port, external_bot_name)426 spy.start()427 timeout_start = time.time() + 10428 while not spy.started:429 if time.time() > timeout_start:430 spy.stop()431 print_message("Spy cannot be start in 10 seconds aborting!", msg_type, sock, pseudo, channel)432 return433 elif spy.error is not None:434 spy.stop()435 print_message("Spy cannot be start because of error: " + spy.error, msg_type,436 sock, pseudo, channel)437 return438 print_message("[!] spying data from " + addr + param[2] + " started")439 eyes.append(spy)440 print_message("Spy start", msg_type, sock, pseudo, channel)441def list_spy(pseudo, message, msg_type, sock, channel):442 print_message("List of SPY:", msg_type, sock, pseudo, channel)443 for spy in eyes:444 print_message("{} on {} in channel {}".format(spy.name, spy.server, spy.channel), msg_type, sock, pseudo,445 channel)446def stop_spy(pseudo, message, msg_type, sock, channel):447 usage = "!kill_spy <server|require> <channel|require>"448 param = message.split()449 if len(param) == 3:450 addr = param[1]451 server_addr = addr.split(":")452 if len(server_addr) == 1:453 server_addr = addr454 port = 6667455 elif len(server_addr) == 2:456 port = int(server_addr[1])457 server_addr = server_addr[0]458 else:459 print_message("too much :", msg_type, sock, pseudo, channel)460 return461 tr_stopped = False462 for spy in eyes:463 if spy.port == port and spy.server == server_addr and spy.channel == param[2]:464 spy.stop()465 tr_stopped = True466 eyes.remove(spy)467 print_message("[!] Spy " + spy.channel + " stopped")468 print_message("Spy " + spy.channel + " stopped", msg_type, sock, pseudo, channel)469 if not tr_stopped:470 print_message("no spy like this one", msg_type, sock, pseudo, channel)...

Full Screen

Full Screen

test_osc.py

Source:test_osc.py Github

copy

Full Screen

1import asyncio2import pytest3from utils import get_random_values4@pytest.mark.asyncio5async def test_nodes():6 from vidhubcontrol.interfaces.osc import OscNode7 class Listener(object):8 def __init__(self):9 self.messages_received = {}10 self.tree_messages_received = {}11 def on_message_received(self, node, client_address, *messages):12 if node.osc_address not in self.messages_received:13 self.messages_received[node.osc_address] = []14 self.messages_received[node.osc_address].extend([m for m in messages])15 def on_tree_message_received(self, node, client_address, *messages):16 if node.osc_address not in self.tree_messages_received:17 self.tree_messages_received[node.osc_address] = []18 self.tree_messages_received[node.osc_address].extend([m for m in messages])19 root = OscNode('root')20 assert root.osc_address == '/root'21 branchA = root.add_child('branchA', children={'leaf1':{}, 'leaf2':{}})22 assert branchA.parent == root23 assert branchA.osc_address == '/root/branchA'24 leafA1 = root.find('branchA/leaf1')25 leafA2 = root.find('branchA/leaf2')26 leafB1 = root.add_child('branchB/leaf1')27 branchB = leafB1.parent28 leafB2 = branchB.add_child('leaf2')29 assert leafB1.parent == leafB2.parent == branchB30 assert branchB.parent == root31 assert branchB.osc_address == '/root/branchB'32 assert leafB1.osc_address == '/root/branchB/leaf1'33 assert root.find('branchB/leaf1') == leafB134 assert root.find('branchB/leaf2') == leafB235 # root.add_child('branchC')36 # root.find('branchC').add_child('leaf1')37 # root.find('branchC').add_child('leaf2')38 leafC1 = root.add_child('branchC/leaf1')39 leafC2 = root.add_child('branchC/leaf2')40 print(repr(leafC1))41 print(repr(leafC2))42 all_nodes = {n.osc_address:n for n in root.walk()}43 print(all_nodes.keys())44 expected = {45 '/root',46 '/root/branchA',47 '/root/branchA/leaf1',48 '/root/branchA/leaf2',49 '/root/branchB',50 '/root/branchB/leaf1',51 '/root/branchB/leaf2',52 '/root/branchC',53 '/root/branchC/leaf1',54 '/root/branchC/leaf2',55 }56 assert set(all_nodes.keys()) == expected57 listener = Listener()58 for node in all_nodes.values():59 node.bind(on_message_received=listener.on_message_received)60 root.bind(on_tree_message_received=listener.on_tree_message_received)61 coros = set()62 for node in all_nodes.values():63 coros.add(node.on_osc_dispatcher_message(node.osc_address, None, *['foo', 'bar']))64 await asyncio.gather(*coros)65 await asyncio.sleep(.01)66 assert set(listener.messages_received.keys()) == set(all_nodes.keys())67 assert set(listener.tree_messages_received.keys()) == set(all_nodes.keys())68 for key in all_nodes.keys():69 assert listener.messages_received[key] == ['foo', 'bar']70 assert listener.tree_messages_received[key] == ['foo', 'bar']71@pytest.mark.asyncio72async def test_pubsub_nodes(missing_netifaces, unused_udp_port_factory):73 from pydispatch import Dispatcher, Property74 from vidhubcontrol.interfaces.osc import OscNode, PubSubOscNode, OSCUDPServer, OscDispatcher75 server_port, client_port = unused_udp_port_factory(), unused_udp_port_factory()76 class Publisher(Dispatcher):77 value = Property()78 other_value = Property()79 def __init__(self, root_node, osc_address):80 self.osc_address = osc_address81 self.random_values = get_random_values(8)82 self.msg_queue = asyncio.Queue()83 if root_node.osc_address == osc_address:84 self.osc_node = root_node85 else:86 self.osc_node = root_node.add_child(osc_address)87 self.subscribe_node = self.osc_node.add_child('_subscribe')88 self.query_node = self.osc_node.add_child('_query')89 self.list_node = self.osc_node.add_child('_list')90 for node in [self.osc_node, self.subscribe_node, self.query_node, self.list_node]:91 node.bind(on_message_received=self.on_client_node_message)92 async def wait_for_response(self):93 msg = await asyncio.wait_for(self.msg_queue.get(), timeout=5)94 self.msg_queue.task_done()95 return msg96 async def subscribe(self, server_addr):97 await self.subscribe_node.send_message(server_addr)98 msg = await self.wait_for_response()99 return msg100 async def query(self, server_addr, recursive=False):101 if recursive:102 await self.query_node.send_message(server_addr, 'recursive')103 else:104 await self.query_node.send_message(server_addr)105 msg = await self.wait_for_response()106 return msg107 def on_client_node_message(self, node, client_address, *messages):108 print('on_client_node_message: ', node, messages)109 self.msg_queue.put_nowait({110 'node':node,111 'client_address':client_address,112 'messages':messages,113 })114 node_addresses = [115 '/root',116 '/root/branchA',117 '/root/branchA/leaf1',118 '/root/branchA/leaf2',119 '/root/branchB',120 '/root/branchB/leaf1',121 '/root/branchB/leaf2',122 '/root/branchC',123 '/root/branchC/leaf1',124 '/root/branchC/leaf2',125 ]126 listeners = {}127 publish_root = PubSubOscNode('root')128 subscribe_root = OscNode('root')129 listener = Publisher(subscribe_root, subscribe_root.osc_address)130 listeners[listener.osc_node.osc_address] = listener131 for addr in node_addresses:132 if addr == '/root':133 pub_node = publish_root134 else:135 _addr = addr.lstrip('/root/')136 listener = Publisher(subscribe_root, _addr)137 listeners[listener.osc_node.osc_address] = listener138 pub_node = publish_root.add_child(_addr, cls=PubSubOscNode, published_property=(listener, 'value'))139 assert listener.osc_node.osc_address == addr140 assert set(node_addresses) == set(listeners.keys())141 pub_addrs = set((n.osc_address for n in publish_root.walk()))142 sub_addrs = set((n.osc_address for n in subscribe_root.walk()))143 assert pub_addrs == sub_addrs144 server_addr = ('127.0.0.1', server_port)145 server_dispatcher = OscDispatcher()146 publish_root.osc_dispatcher = server_dispatcher147 server = OSCUDPServer(server_addr, server_dispatcher)148 client_addr = ('127.0.0.1', client_port)149 client_dispatcher = OscDispatcher()150 subscribe_root.osc_dispatcher = client_dispatcher151 client = OSCUDPServer(client_addr, client_dispatcher)152 await server.start()153 await client.start()154 # Subscribe and test for property changes and query responses155 for listener in listeners.values():156 if listener.osc_node is subscribe_root:157 continue158 msg = await listener.subscribe(server_addr)159 assert msg['node'] is listener.subscribe_node160 for v in listener.random_values:161 listener.value = v162 msg = await listener.wait_for_response()163 assert msg['node'] is listener.osc_node164 assert len(msg['messages']) == 1165 if isinstance(v, float):166 assert v == pytest.approx(msg['messages'][0])167 else:168 assert v == msg['messages'][0]169 msg = await listener.query(server_addr)170 assert msg['node'] is listener.osc_node171 assert len(msg['messages']) == 1172 if isinstance(v, float):173 assert listener.value == pytest.approx(msg['messages'][0])174 else:175 assert listener.value == msg['messages'][0]176 listener = listeners['/root']177 # published_property has not yet been set on the root node178 # so there should be no responses179 with pytest.raises(NotImplementedError):180 publish_root.get_query_response()181 await listener.query_node.send_message(server_addr)182 await asyncio.sleep(1)183 assert listener.msg_queue.empty()184 # Set the property value to something other than None to avoid185 # errors in python-osc186 listener.value = 'a'187 # Now test binding post-init188 publish_root.published_property = (listener, 'value')189 msg = await listener.query(server_addr)190 assert msg['node'] is listener.osc_node191 assert len(msg['messages']) == 1192 assert msg['messages'][0] == listener.value == 'a'193 msg = await listener.subscribe(server_addr)194 assert msg['node'] is listener.subscribe_node195 listener.value = 'foo'196 msg = await listener.wait_for_response()197 assert msg['node'] is listener.osc_node198 assert len(msg['messages']) == 1199 assert msg['messages'][0] == listener.value == 'foo'200 msg = await listener.query(server_addr, recursive=True)201 assert msg['node'] is listener.osc_node202 assert len(msg['messages']) == 1203 assert msg['messages'][0] == listener.value204 for _listener in listeners.values():205 if _listener is listener:206 continue207 msg = await _listener.wait_for_response()208 assert msg['node'] is _listener.osc_node209 assert len(msg['messages']) == 1210 if isinstance(_listener.value, float):211 assert msg['messages'][0] == pytest.approx(_listener.value)212 else:213 assert msg['messages'][0] == _listener.value214 # Test property re-binding215 for _listener in listeners.values():216 current_value = _listener.value217 if _listener.osc_node is subscribe_root:218 pub_node = publish_root219 else:220 pub_node = publish_root.find(_listener.osc_address.lstrip('/root/'))221 pub_node.published_property = (_listener, 'other_value')222 _listener.value = 'foobar'223 _listener.other_value = 'baz'224 msg = await _listener.wait_for_response()225 assert len(msg['messages']) == 1226 assert msg['messages'][0] == _listener.other_value == 'baz'227 pub_node.published_property = (_listener, 'value')228 _listener.value = current_value229 msg = await _listener.wait_for_response()230 assert len(msg['messages']) == 1231 if isinstance(current_value, float):232 assert pytest.approx(msg['messages'][0]) == _listener.value == current_value233 else:234 assert msg['messages'][0] == _listener.value == current_value235 assert _listener.msg_queue.empty()236 # Unbind published_property and test for recursive empty responses237 for _listener in listeners.values():238 if _listener.osc_node is subscribe_root:239 pub_node = publish_root240 else:241 pub_node = publish_root.find(_listener.osc_address.lstrip('/root/'))242 pub_node.published_property = None243 await listener.query_node.send_message(server_addr, 'recursive')244 await asyncio.sleep(1)245 for _listener in listeners.values():246 assert _listener.msg_queue.empty()247 # Test list (recursive and non-recursive)248 await listener.list_node.send_message(server_addr, False)249 msg = await listener.wait_for_response()250 assert msg['node'] is listener.list_node251 expected = ['branchA', 'branchB', 'branchC']252 assert set(msg['messages']) == set(expected)253 await listener.list_node.send_message(server_addr, 'recursive')254 msg = await listener.wait_for_response()255 assert msg['node'] is listener.list_node256 expected = [addr.lstrip('/root/') for addr in node_addresses if addr != '/root']257 assert set(msg['messages']) == set(expected)258 await client.stop()259 await server.stop()260@pytest.mark.asyncio261async def test_interface(missing_netifaces, unused_udp_port_factory):262 from vidhubcontrol.interfaces.osc import OscNode, OscInterface, OSCUDPServer, OscDispatcher263 from vidhubcontrol.backends import DummyBackend264 server_port, client_port = unused_udp_port_factory(), unused_udp_port_factory()265 class NodeResponse(object):266 def __init__(self, node=None):267 self.msg_queue = asyncio.Queue()268 self.node = node269 @property270 def node(self):271 return getattr(self, '_node', None)272 @node.setter273 def node(self, node):274 if self.node is not None:275 self.node.unbind(self)276 self._node = node277 print(node)278 if node is not None:279 node.bind(on_message_received=self.on_message_received)280 async def subscribe_to_node(self, node, server_addr):281 self.node = None282 subscribe_node = node.add_child('_subscribe')283 self.node = subscribe_node284 await subscribe_node.send_message(server_addr)285 await self.wait_for_response()286 self.node = node287 async def unsubscribe(self, server_addr):288 subscribe_node = self.node.find('_subscribe')289 self.node = subscribe_node290 await subscribe_node.send_message(server_addr, False)291 await self.wait_for_response()292 self.node = None293 async def wait_for_response(self):294 msg = await asyncio.wait_for(self.msg_queue.get(), timeout=5)295 self.msg_queue.task_done()296 return msg297 def on_message_received(self, node, client_address, *messages):298 print('on_message_received: ', node, client_address, messages)299 self.msg_queue.put_nowait({300 'node':node,301 'client_address':client_address,302 'messages':messages,303 })304 interface = OscInterface(hostport=server_port)305 vidhub = DummyBackend(device_name='dummy-name')306 await vidhub.connect()307 await interface.add_vidhub(vidhub)308 await interface.start()309 client_node = OscNode('vidhubcontrol')310 client_dispatcher = OscDispatcher()311 client_node.osc_dispatcher = client_dispatcher312 client_addr = (str(interface.hostiface.ip), client_port)313 client = OSCUDPServer(client_addr, client_dispatcher)314 await client.start()315 server_addr = interface.server._server_address316 assert interface.root_node.find('vidhubs/by-id/dummy') is not None317 assert interface.root_node.find('vidhubs/by-name/dummy-name') is not None318 by_id_response = NodeResponse()319 n = client_node.add_child('vidhubs/by-id')320 await by_id_response.subscribe_to_node(n, server_addr)321 n = n.add_child('_query')322 await n.send_message(server_addr)323 msg = await by_id_response.wait_for_response()324 assert vidhub.device_id in msg['messages']325 by_name_response = NodeResponse()326 await by_name_response.subscribe_to_node(client_node.add_child('vidhubs/by-name'), server_addr)327 cnode = client_node.add_child('vidhubs/by-id/dummy/labels/output/_list')328 node_response = NodeResponse(cnode)329 await cnode.send_message(server_addr)330 msg = await node_response.wait_for_response()331 assert set(msg['messages']) == set((str(i) for i in range(vidhub.num_outputs)))332 for i, lbl in enumerate(vidhub.output_labels):333 addr = 'vidhubs/by-id/dummy/labels/output/{}'.format(i)334 assert interface.root_node.find(addr) is not None335 cnode = client_node.add_child(addr)336 await node_response.subscribe_to_node(cnode, server_addr)337 await cnode.send_message(server_addr, 'FOO OUT {}'.format(i))338 msg = await node_response.wait_for_response()339 assert msg['messages'][0] == 'FOO OUT {}'.format(i)340 assert interface.root_node.find('vidhubs/by-name/dummy-name/labels/output/{}'.format(i)) is not None341 await node_response.unsubscribe(server_addr)342 for i, lbl in enumerate(vidhub.input_labels):343 addr = 'vidhubs/by-id/dummy/labels/input/{}'.format(i)344 assert interface.root_node.find(addr) is not None345 cnode = client_node.add_child(addr)346 await node_response.subscribe_to_node(cnode, server_addr)347 await cnode.send_message(server_addr, 'FOO IN {}'.format(i))348 msg = await node_response.wait_for_response()349 assert msg['messages'][0] == 'FOO IN {}'.format(i)350 assert interface.root_node.find('vidhubs/by-name/dummy-name/labels/input/{}'.format(i)) is not None351 await node_response.unsubscribe(server_addr)352 crosspoint_node = client_node.add_child('vidhubs/by-id/dummy/crosspoints')353 crosspoint_response = NodeResponse()354 await crosspoint_response.subscribe_to_node(crosspoint_node, server_addr)355 await crosspoint_node.add_child('_query').send_message(server_addr)356 msg = await crosspoint_response.wait_for_response()357 assert list(msg['messages']) == vidhub.crosspoints[:]358 for out_idx, in_idx in enumerate(vidhub.crosspoints):359 addr = 'vidhubs/by-id/dummy/crosspoints/{}'.format(out_idx)360 assert interface.root_node.find(addr) is not None361 cnode = client_node.add_child(addr)362 await node_response.subscribe_to_node(cnode, server_addr)363 await cnode.send_message(server_addr, 2)364 msg = await node_response.wait_for_response()365 assert msg['node'].osc_address == cnode.osc_address366 assert msg['messages'][0] == 2367 assert interface.root_node.find('vidhubs/by-name/dummy-name/crosspoints/{}'.format(i)) is not None368 await node_response.unsubscribe(server_addr)369 msg = await crosspoint_response.wait_for_response()370 assert crosspoint_response.msg_queue.empty()371 assert list(msg['messages']) == vidhub.crosspoints[:]372 for i, lbl in enumerate(vidhub.output_labels):373 assert lbl == 'FOO OUT {}'.format(i)374 for i, lbl in enumerate(vidhub.input_labels):375 assert lbl == 'FOO IN {}'.format(i)376 for xpt in vidhub.crosspoints:377 assert xpt == 2378 expected = vidhub.crosspoints[:]379 await crosspoint_node.send_message(server_addr)380 msg = await crosspoint_response.wait_for_response()381 assert msg['node'].osc_address == crosspoint_node.osc_address382 assert list(msg['messages']) == expected383 expected = set(str(i) for i in range(vidhub.num_outputs))384 crosspoint_list_node = crosspoint_node.add_child('_list')385 node_response.node = crosspoint_list_node386 await crosspoint_list_node.send_message(server_addr)387 msg = await node_response.wait_for_response()388 assert msg['node'].osc_address == crosspoint_list_node.osc_address389 assert set(msg['messages']) == expected390 # Test presets391 preset_node = client_node.add_child('vidhubs/by-id/dummy/presets')392 preset_node.add_child('recall')393 preset_node.add_child('store')394 class PresetAwait(object):395 def __init__(self, event_name):396 self.event = asyncio.Event()397 self.args = None398 self.kwargs = None399 vidhub.bind(**{event_name:self.on_vidhub_event})400 def on_vidhub_event(self, *args, **kwargs):401 self.args = args402 self.kwargs = kwargs403 self.event.set()404 async def wait(self):405 await asyncio.wait_for(self.event.wait(), timeout=5)406 self.event.clear()407 return self.args, self.kwargs408 waiter = PresetAwait('on_preset_stored')409 await node_response.subscribe_to_node(crosspoint_node, server_addr)410 preset_response = NodeResponse()411 for i in range(vidhub.num_inputs):412 xpts = [i]*vidhub.num_outputs413 await crosspoint_node.send_message(server_addr, *xpts)414 while True:415 msg = await node_response.wait_for_response()416 assert msg['node'].osc_address == crosspoint_node.osc_address417 if list(msg['messages']) == xpts:418 break419 await asyncio.sleep(0)420 msg = await crosspoint_response.wait_for_response()421 assert list(msg['messages']) == vidhub.crosspoints422 assert vidhub.crosspoints == xpts423 name = 'preset_{}'.format(i)424 await preset_node.find('store').send_message(server_addr, i, name)425 await waiter.wait()426 assert vidhub.presets[i].name == name427 assert vidhub.presets[i].crosspoints == {i:v for i, v in enumerate(xpts)}428 assert len(vidhub.presets) == vidhub.num_inputs429 expected = set((str(p.index) for p in vidhub.presets))430 preset_response.node = preset_node431 await preset_node.send_message(server_addr)432 msg = await preset_response.wait_for_response()433 assert msg['node'].osc_address == preset_node.osc_address434 assert set(msg['messages']) == expected435 preset_list_node = preset_node.add_child('_list')436 preset_response.node = preset_list_node437 await preset_list_node.send_message(server_addr)438 msg = await preset_response.wait_for_response()439 assert msg['node'].osc_address == preset_list_node.osc_address440 expected |= set(['recall', 'store'])441 assert set(msg['messages']) == expected442 for preset in vidhub.presets:443 await preset_response.subscribe_to_node(444 preset_node.add_child('/'.join([str(preset.index), 'active'])),445 server_addr,446 )447 assert not preset.active448 n = preset_node.find('recall')449 await n.send_message(server_addr, preset.index)450 msg = await preset_response.wait_for_response()451 assert msg['node'].osc_address == preset_response.node.osc_address452 assert msg['messages'][0] == True453 assert preset.active454 vidhub.device_name = 'dummy-foo'455 msg = await by_name_response.wait_for_response()456 assert 'dummy-foo' in msg['messages']457 vidhub2 = DummyBackend(device_id='dummy2', device_name='dummy-name-2')458 await vidhub2.connect()459 await interface.add_vidhub(vidhub2)460 msg = await by_id_response.wait_for_response()461 assert set(msg['messages']) == set([vidhub.device_id, vidhub2.device_id])462 msg = await by_name_response.wait_for_response()463 assert set(msg['messages']) == set([vidhub.device_name, vidhub2.device_name])464 await client.stop()465 await interface.stop()466@pytest.mark.asyncio467async def test_interface_config(tempconfig, missing_netifaces):468 from vidhubcontrol.config import Config469 from vidhubcontrol.backends import DummyBackend470 from vidhubcontrol.interfaces.osc import OscInterface471 config = Config.load(str(tempconfig))472 interface = OscInterface(config=config)473 await config.start()474 await interface.start()475 vidhub = await DummyBackend.create_async(device_id='foo')476 await config.add_vidhub(vidhub)477 async def wait_for_foo():478 while 'foo' not in interface.vidhubs:479 await asyncio.sleep(.1)480 await asyncio.wait_for(wait_for_foo(), timeout=5)481 vidhub_node = interface.root_node.find('vidhubs/by-id/foo')482 for i in range(vidhub.num_outputs):483 assert vidhub_node.find('crosspoints/{}'.format(i)) is not None484 assert vidhub_node.find('labels/output/{}'.format(i)) is not None485 for i in range(vidhub.num_inputs):486 assert vidhub_node.find('labels/input/{}'.format(i)) is not None487 await interface.stop()...

Full Screen

Full Screen

websocket_test.py

Source:websocket_test.py Github

copy

Full Screen

1import errno2import socket3import sys4import eventlet5from eventlet import event6from eventlet import greenio7from eventlet.green import httplib8import six9from eventlet.websocket import WebSocket, WebSocketWSGI10import tests11from tests import mock12import tests.wsgi_test13# demo app14def handle(ws):15 if ws.path == '/echo':16 while True:17 m = ws.wait()18 if m is None:19 break20 ws.send(m)21 elif ws.path == '/range':22 for i in range(10):23 ws.send("msg %d" % i)24 eventlet.sleep(0.01)25 elif ws.path == '/error':26 # some random socket error that we shouldn't normally get27 raise socket.error(errno.ENOTSOCK)28 else:29 ws.close()30wsapp = WebSocketWSGI(handle)31class TestWebSocket(tests.wsgi_test._TestBase):32 TEST_TIMEOUT = 533 def set_site(self):34 self.site = wsapp35 def test_incorrect_headers(self):36 http = httplib.HTTPConnection(*self.server_addr)37 http.request("GET", "/echo")38 response = http.getresponse()39 assert response.status == 40040 def test_incomplete_headers_75(self):41 headers = dict(kv.split(': ') for kv in [42 "Upgrade: WebSocket",43 # NOTE: intentionally no connection header44 "Host: %s:%s" % self.server_addr,45 "Origin: http://%s:%s" % self.server_addr,46 "WebSocket-Protocol: ws",47 ])48 http = httplib.HTTPConnection(*self.server_addr)49 http.request("GET", "/echo", headers=headers)50 resp = http.getresponse()51 self.assertEqual(resp.status, 400)52 self.assertEqual(resp.getheader('connection'), 'close')53 self.assertEqual(resp.read(), b'')54 def test_incomplete_headers_76(self):55 # First test: Missing Connection:56 headers = dict(kv.split(': ') for kv in [57 "Upgrade: WebSocket",58 # NOTE: intentionally no connection header59 "Host: %s:%s" % self.server_addr,60 "Origin: http://%s:%s" % self.server_addr,61 "Sec-WebSocket-Protocol: ws",62 ])63 http = httplib.HTTPConnection(*self.server_addr)64 http.request("GET", "/echo", headers=headers)65 resp = http.getresponse()66 self.assertEqual(resp.status, 400)67 self.assertEqual(resp.getheader('connection'), 'close')68 self.assertEqual(resp.read(), b'')69 # Now, miss off key270 headers = dict(kv.split(': ') for kv in [71 "Upgrade: WebSocket",72 "Connection: Upgrade",73 "Host: %s:%s" % self.server_addr,74 "Origin: http://%s:%s" % self.server_addr,75 "Sec-WebSocket-Protocol: ws",76 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",77 # NOTE: Intentionally no Key2 header78 ])79 http = httplib.HTTPConnection(*self.server_addr)80 http.request("GET", "/echo", headers=headers)81 resp = http.getresponse()82 self.assertEqual(resp.status, 400)83 self.assertEqual(resp.getheader('connection'), 'close')84 self.assertEqual(resp.read(), b'')85 def test_correct_upgrade_request_75(self):86 connect = [87 "GET /echo HTTP/1.1",88 "Upgrade: WebSocket",89 "Connection: Upgrade",90 "Host: %s:%s" % self.server_addr,91 "Origin: http://%s:%s" % self.server_addr,92 "WebSocket-Protocol: ws",93 ]94 sock = eventlet.connect(self.server_addr)95 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))96 result = sock.recv(1024)97 # The server responds the correct Websocket handshake98 self.assertEqual(result, six.b('\r\n'.join([99 'HTTP/1.1 101 Web Socket Protocol Handshake',100 'Upgrade: WebSocket',101 'Connection: Upgrade',102 'WebSocket-Origin: http://%s:%s' % self.server_addr,103 'WebSocket-Location: ws://%s:%s/echo\r\n\r\n' % self.server_addr,104 ])))105 def test_correct_upgrade_request_76(self):106 connect = [107 "GET /echo HTTP/1.1",108 "Upgrade: WebSocket",109 "Connection: Upgrade",110 "Host: %s:%s" % self.server_addr,111 "Origin: http://%s:%s" % self.server_addr,112 "Sec-WebSocket-Protocol: ws",113 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",114 "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",115 ]116 sock = eventlet.connect(self.server_addr)117 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))118 result = sock.recv(1024)119 # The server responds the correct Websocket handshake120 self.assertEqual(result, six.b('\r\n'.join([121 'HTTP/1.1 101 WebSocket Protocol Handshake',122 'Upgrade: WebSocket',123 'Connection: Upgrade',124 'Sec-WebSocket-Origin: http://%s:%s' % self.server_addr,125 'Sec-WebSocket-Protocol: ws',126 'Sec-WebSocket-Location: ws://%s:%s/echo\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.server_addr,127 ])))128 def test_query_string(self):129 # verify that the query string comes out the other side unscathed130 connect = [131 "GET /echo?query_string HTTP/1.1",132 "Upgrade: WebSocket",133 "Connection: Upgrade",134 "Host: %s:%s" % self.server_addr,135 "Origin: http://%s:%s" % self.server_addr,136 "Sec-WebSocket-Protocol: ws",137 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",138 "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",139 ]140 sock = eventlet.connect(self.server_addr)141 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))142 result = sock.recv(1024)143 self.assertEqual(result, six.b('\r\n'.join([144 'HTTP/1.1 101 WebSocket Protocol Handshake',145 'Upgrade: WebSocket',146 'Connection: Upgrade',147 'Sec-WebSocket-Origin: http://%s:%s' % self.server_addr,148 'Sec-WebSocket-Protocol: ws',149 'Sec-WebSocket-Location: '150 'ws://%s:%s/echo?query_string\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.server_addr,151 ])))152 def test_empty_query_string(self):153 # verify that a single trailing ? doesn't get nuked154 connect = [155 "GET /echo? HTTP/1.1",156 "Upgrade: WebSocket",157 "Connection: Upgrade",158 "Host: %s:%s" % self.server_addr,159 "Origin: http://%s:%s" % self.server_addr,160 "Sec-WebSocket-Protocol: ws",161 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",162 "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",163 ]164 sock = eventlet.connect(self.server_addr)165 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))166 result = sock.recv(1024)167 self.assertEqual(result, six.b('\r\n'.join([168 'HTTP/1.1 101 WebSocket Protocol Handshake',169 'Upgrade: WebSocket',170 'Connection: Upgrade',171 'Sec-WebSocket-Origin: http://%s:%s' % self.server_addr,172 'Sec-WebSocket-Protocol: ws',173 'Sec-WebSocket-Location: ws://%s:%s/echo?\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.server_addr,174 ])))175 def test_sending_messages_to_websocket_75(self):176 connect = [177 "GET /echo HTTP/1.1",178 "Upgrade: WebSocket",179 "Connection: Upgrade",180 "Host: %s:%s" % self.server_addr,181 "Origin: http://%s:%s" % self.server_addr,182 "WebSocket-Protocol: ws",183 ]184 sock = eventlet.connect(self.server_addr)185 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))186 sock.recv(1024)187 sock.sendall(b'\x00hello\xFF')188 result = sock.recv(1024)189 self.assertEqual(result, b'\x00hello\xff')190 sock.sendall(b'\x00start')191 eventlet.sleep(0.001)192 sock.sendall(b' end\xff')193 result = sock.recv(1024)194 self.assertEqual(result, b'\x00start end\xff')195 sock.shutdown(socket.SHUT_RDWR)196 sock.close()197 eventlet.sleep(0.01)198 def test_sending_messages_to_websocket_76(self):199 connect = [200 "GET /echo HTTP/1.1",201 "Upgrade: WebSocket",202 "Connection: Upgrade",203 "Host: %s:%s" % self.server_addr,204 "Origin: http://%s:%s" % self.server_addr,205 "Sec-WebSocket-Protocol: ws",206 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",207 "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",208 ]209 sock = eventlet.connect(self.server_addr)210 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))211 sock.recv(1024)212 sock.sendall(b'\x00hello\xFF')213 result = sock.recv(1024)214 self.assertEqual(result, b'\x00hello\xff')215 sock.sendall(b'\x00start')216 eventlet.sleep(0.001)217 sock.sendall(b' end\xff')218 result = sock.recv(1024)219 self.assertEqual(result, b'\x00start end\xff')220 sock.shutdown(socket.SHUT_RDWR)221 sock.close()222 eventlet.sleep(0.01)223 def test_getting_messages_from_websocket_75(self):224 connect = [225 "GET /range HTTP/1.1",226 "Upgrade: WebSocket",227 "Connection: Upgrade",228 "Host: %s:%s" % self.server_addr,229 "Origin: http://%s:%s" % self.server_addr,230 "WebSocket-Protocol: ws",231 ]232 sock = eventlet.connect(self.server_addr)233 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))234 resp = sock.recv(1024)235 headers, result = resp.split(b'\r\n\r\n')236 msgs = [result.strip(b'\x00\xff')]237 cnt = 10238 while cnt:239 msgs.append(sock.recv(20).strip(b'\x00\xff'))240 cnt -= 1241 # Last item in msgs is an empty string242 self.assertEqual(msgs[:-1], [six.b('msg %d' % i) for i in range(10)])243 def test_getting_messages_from_websocket_76(self):244 connect = [245 "GET /range HTTP/1.1",246 "Upgrade: WebSocket",247 "Connection: Upgrade",248 "Host: %s:%s" % self.server_addr,249 "Origin: http://%s:%s" % self.server_addr,250 "Sec-WebSocket-Protocol: ws",251 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",252 "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",253 ]254 sock = eventlet.connect(self.server_addr)255 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))256 resp = sock.recv(1024)257 headers, result = resp.split(b'\r\n\r\n')258 msgs = [result[16:].strip(b'\x00\xff')]259 cnt = 10260 while cnt:261 msgs.append(sock.recv(20).strip(b'\x00\xff'))262 cnt -= 1263 # Last item in msgs is an empty string264 self.assertEqual(msgs[:-1], [six.b('msg %d' % i) for i in range(10)])265 def test_breaking_the_connection_75(self):266 error_detected = [False]267 done_with_request = event.Event()268 site = self.site269 def error_detector(environ, start_response):270 try:271 try:272 return site(environ, start_response)273 except:274 error_detected[0] = True275 raise276 finally:277 done_with_request.send(True)278 self.site = error_detector279 self.spawn_server()280 connect = [281 "GET /range HTTP/1.1",282 "Upgrade: WebSocket",283 "Connection: Upgrade",284 "Host: %s:%s" % self.server_addr,285 "Origin: http://%s:%s" % self.server_addr,286 "WebSocket-Protocol: ws",287 ]288 sock = eventlet.connect(self.server_addr)289 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))290 sock.recv(1024) # get the headers291 sock.close() # close while the app is running292 done_with_request.wait()293 assert not error_detected[0]294 def test_breaking_the_connection_76(self):295 error_detected = [False]296 done_with_request = event.Event()297 site = self.site298 def error_detector(environ, start_response):299 try:300 try:301 return site(environ, start_response)302 except:303 error_detected[0] = True304 raise305 finally:306 done_with_request.send(True)307 self.site = error_detector308 self.spawn_server()309 connect = [310 "GET /range HTTP/1.1",311 "Upgrade: WebSocket",312 "Connection: Upgrade",313 "Host: %s:%s" % self.server_addr,314 "Origin: http://%s:%s" % self.server_addr,315 "Sec-WebSocket-Protocol: ws",316 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",317 "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",318 ]319 sock = eventlet.connect(self.server_addr)320 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))321 sock.recv(1024) # get the headers322 sock.close() # close while the app is running323 done_with_request.wait()324 assert not error_detected[0]325 def test_client_closing_connection_76(self):326 error_detected = [False]327 done_with_request = event.Event()328 site = self.site329 def error_detector(environ, start_response):330 try:331 try:332 return site(environ, start_response)333 except:334 error_detected[0] = True335 raise336 finally:337 done_with_request.send(True)338 self.site = error_detector339 self.spawn_server()340 connect = [341 "GET /echo HTTP/1.1",342 "Upgrade: WebSocket",343 "Connection: Upgrade",344 "Host: %s:%s" % self.server_addr,345 "Origin: http://%s:%s" % self.server_addr,346 "Sec-WebSocket-Protocol: ws",347 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",348 "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",349 ]350 sock = eventlet.connect(self.server_addr)351 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))352 sock.recv(1024) # get the headers353 sock.sendall(b'\xff\x00') # "Close the connection" packet.354 done_with_request.wait()355 assert not error_detected[0]356 def test_client_invalid_packet_76(self):357 error_detected = [False]358 done_with_request = event.Event()359 site = self.site360 def error_detector(environ, start_response):361 try:362 try:363 return site(environ, start_response)364 except:365 error_detected[0] = True366 raise367 finally:368 done_with_request.send(True)369 self.site = error_detector370 self.spawn_server()371 connect = [372 "GET /echo HTTP/1.1",373 "Upgrade: WebSocket",374 "Connection: Upgrade",375 "Host: %s:%s" % self.server_addr,376 "Origin: http://%s:%s" % self.server_addr,377 "Sec-WebSocket-Protocol: ws",378 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",379 "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",380 ]381 sock = eventlet.connect(self.server_addr)382 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))383 sock.recv(1024) # get the headers384 sock.sendall(b'\xef\x00') # Weird packet.385 done_with_request.wait()386 assert error_detected[0]387 def test_server_closing_connect_76(self):388 connect = [389 "GET / HTTP/1.1",390 "Upgrade: WebSocket",391 "Connection: Upgrade",392 "Host: %s:%s" % self.server_addr,393 "Origin: http://%s:%s" % self.server_addr,394 "Sec-WebSocket-Protocol: ws",395 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",396 "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",397 ]398 sock = eventlet.connect(self.server_addr)399 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))400 resp = sock.recv(1024)401 headers, result = resp.split(b'\r\n\r\n')402 # The remote server should have immediately closed the connection.403 self.assertEqual(result[16:], b'\xff\x00')404 def test_app_socket_errors_75(self):405 error_detected = [False]406 done_with_request = event.Event()407 site = self.site408 def error_detector(environ, start_response):409 try:410 try:411 return site(environ, start_response)412 except:413 error_detected[0] = True414 raise415 finally:416 done_with_request.send(True)417 self.site = error_detector418 self.spawn_server()419 connect = [420 "GET /error HTTP/1.1",421 "Upgrade: WebSocket",422 "Connection: Upgrade",423 "Host: %s:%s" % self.server_addr,424 "Origin: http://%s:%s" % self.server_addr,425 "WebSocket-Protocol: ws",426 ]427 sock = eventlet.connect(self.server_addr)428 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))429 sock.recv(1024)430 done_with_request.wait()431 assert error_detected[0]432 def test_app_socket_errors_76(self):433 error_detected = [False]434 done_with_request = event.Event()435 site = self.site436 def error_detector(environ, start_response):437 try:438 try:439 return site(environ, start_response)440 except:441 error_detected[0] = True442 raise443 finally:444 done_with_request.send(True)445 self.site = error_detector446 self.spawn_server()447 connect = [448 "GET /error HTTP/1.1",449 "Upgrade: WebSocket",450 "Connection: Upgrade",451 "Host: %s:%s" % self.server_addr,452 "Origin: http://%s:%s" % self.server_addr,453 "Sec-WebSocket-Protocol: ws",454 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",455 "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",456 ]457 sock = eventlet.connect(self.server_addr)458 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))459 sock.recv(1024)460 done_with_request.wait()461 assert error_detected[0]462 def test_close_idle(self):463 pool = eventlet.GreenPool()464 # use log=stderr when test runner can capture it465 self.spawn_server(custom_pool=pool, log=sys.stdout)466 connect = (467 'GET /echo HTTP/1.1',468 'Upgrade: WebSocket',469 'Connection: Upgrade',470 'Host: %s:%s' % self.server_addr,471 'Origin: http://%s:%s' % self.server_addr,472 'Sec-WebSocket-Protocol: ws',473 'Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5',474 'Sec-WebSocket-Key2: 12998 5 Y3 1 .P00',475 )476 sock = eventlet.connect(self.server_addr)477 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))478 sock.recv(1024)479 sock.sendall(b'\x00hello\xff')480 result = sock.recv(1024)481 assert result, b'\x00hello\xff'482 self.killer.kill(KeyboardInterrupt)483 with eventlet.Timeout(1):484 pool.waitall()485class TestWebSocketSSL(tests.wsgi_test._TestBase):486 def set_site(self):487 self.site = wsapp488 @tests.skip_if_no_ssl489 def test_ssl_sending_messages(self):490 s = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),491 certfile=tests.certificate_file,492 keyfile=tests.private_key_file,493 server_side=True)494 self.spawn_server(sock=s)495 connect = [496 "GET /echo HTTP/1.1",497 "Upgrade: WebSocket",498 "Connection: Upgrade",499 "Host: %s:%s" % self.server_addr,500 "Origin: http://%s:%s" % self.server_addr,501 "Sec-WebSocket-Protocol: ws",502 "Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",503 "Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",504 ]505 sock = eventlet.wrap_ssl(eventlet.connect(self.server_addr))506 sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))507 first_resp = b''508 while b'\r\n\r\n' not in first_resp:509 first_resp += sock.recv()510 print('resp now:')511 print(first_resp)512 # make sure it sets the wss: protocol on the location header513 loc_line = [x for x in first_resp.split(b"\r\n")514 if x.lower().startswith(b'sec-websocket-location')][0]515 expect_wss = ('wss://%s:%s' % self.server_addr).encode()516 assert expect_wss in loc_line, "Expecting wss protocol in location: %s" % loc_line517 sock.sendall(b'\x00hello\xFF')518 result = sock.recv(1024)519 self.assertEqual(result, b'\x00hello\xff')520 sock.sendall(b'\x00start')521 eventlet.sleep(0.001)522 sock.sendall(b' end\xff')523 result = sock.recv(1024)524 self.assertEqual(result, b'\x00start end\xff')525 greenio.shutdown_safe(sock)526 sock.close()527 eventlet.sleep(0.01)528class TestWebSocketObject(tests.LimitedTestCase):529 def setUp(self):530 self.mock_socket = s = mock.Mock()531 self.environ = env = dict(HTTP_ORIGIN='http://localhost', HTTP_WEBSOCKET_PROTOCOL='ws',532 PATH_INFO='test')533 self.test_ws = WebSocket(s, env)534 super(TestWebSocketObject, self).setUp()535 def test_recieve(self):536 ws = self.test_ws537 ws.socket.recv.return_value = b'\x00hello\xFF'538 self.assertEqual(ws.wait(), 'hello')539 self.assertEqual(ws._buf, b'')540 self.assertEqual(len(ws._msgs), 0)541 ws.socket.recv.return_value = b''542 self.assertEqual(ws.wait(), None)543 self.assertEqual(ws._buf, b'')544 self.assertEqual(len(ws._msgs), 0)545 def test_send_to_ws(self):546 ws = self.test_ws547 ws.send(u'hello')548 assert ws.socket.sendall.called_with("\x00hello\xFF")549 ws.send(10)550 assert ws.socket.sendall.called_with("\x0010\xFF")551 def test_close_ws(self):552 ws = self.test_ws553 ws.close()...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful