Best Python code snippet using autotest_python
process.py
Source:process.py
1# noinspection PyUnresolvedReferences2from dsbin.imports import DSRHook, Stats3from time import sleep4from math import pi5def wait_for(predicate, desired_state=True, single_frame=0.016):6 if not callable(predicate):7 return False8 else:9 while predicate() != desired_state:10 sleep(single_frame)11 return True12class ReadMemoryError(Exception):13 def __init__(self, message="Failed to read memory"):14 self.message = message15 super(ReadMemoryError, self).__init__(self.message)16class WriteMemoryError(Exception):17 def __init__(self, message="Failed to write memory"):18 self.message = message19 super(WriteMemoryError, self).__init__(self.message)20class AsmExecuteError(Exception):21 ERR = ({22 0x00000080: "WAIT_ABANDONED",23 0x00000102: "WAIT_TIMEOUT",24 0xFFFFFFFF: "WAIT_FAILED"25 })26 def __init__(self, code, message="Failed to execute assembly"):27 self.message = message28 self.error = AsmExecuteError.ERR[code] if code in AsmExecuteError.ERR.keys() else "REASON_UNKNOWN"29 super(AsmExecuteError, self).__init__(self.message)30 def __str__(self):31 return "%s (%s)" % (self.message, self.error)32class DSRProcess:33 def __init__(self, process_name, debug=False):34 self._hook = DSRHook(self, 5000, 5000, process_name)35 self._debug = debug36 self._hook.OnHooked += lambda caller, *e: self._hook.DSRHook_OnHooked()37 self._hook.OnHooked += lambda caller, *e: getattr(self, "update_version", lambda: None)()38 self._hook.Start()39 def get_version(self):40 return self._hook.Version41 def is_hooked(self):42 return self._hook.Hooked43 def can_read(self):44 return self._hook.Loaded45 def set_animation_speed(self, value: float):46 if not self._hook.SetAnimSpeed(value):47 raise WriteMemoryError()48 def menu_kick(self):49 self._hook.MenuKick()50 def display_banner(self, value: int):51 self._hook.DisplayBanner(value)52 def game_restart(self):53 return self._hook.GameRestart()54 def get_stat(self, stat: Stats):55 if stat == Stats.VIT:56 return self._hook.GetVitality()57 elif stat == Stats.ATN:58 return self._hook.GetAttunement()59 elif stat == Stats.END:60 return self._hook.GetEndurance()61 elif stat == Stats.STR:62 return self._hook.GetStrength()63 elif stat == Stats.DEX:64 return self._hook.GetDexterity()65 elif stat == Stats.RES:66 return self._hook.GetResistance()67 elif stat == Stats.INT:68 return self._hook.GetIntelligence()69 elif stat == Stats.FTH:70 return self._hook.GetFaith()71 elif stat == Stats.LVL:72 return self._hook.GetSoulLevel()73 elif stat == Stats.SLS:74 return self._hook.GetSouls()75 elif stat == Stats.HUM:76 return self._hook.GetHumanity()77 def update_sl(self, stat: Stats, new_val: int):78 old_val = self.get_stat(stat)79 return self._hook.SetSoulLevel(self._hook.GetSoulLevel() + (new_val - old_val))80 def set_stat(self, stat: Stats, value: int):81 if stat == Stats.VIT:82 return self.update_sl(stat, value) and self._hook.SetVitality(value)83 elif stat == Stats.ATN:84 return self.update_sl(stat, value) and self._hook.SetAttunement(value)85 elif stat == Stats.END:86 return self.update_sl(stat, value) and self._hook.SetEndurance(value)87 elif stat == Stats.STR:88 return self.update_sl(stat, value) and self._hook.SetStrength(value)89 elif stat == Stats.DEX:90 return self.update_sl(stat, value) and self._hook.SetDexterity(value)91 elif stat == Stats.RES:92 return self.update_sl(stat, value) and self._hook.SetResistance(value)93 elif stat == Stats.INT:94 return self.update_sl(stat, value) and self._hook.SetIntelligence(value)95 elif stat == Stats.FTH:96 return self.update_sl(stat, value) and self._hook.SetFaith(value)97 elif stat == Stats.LVL:98 return self._hook.SetSoulLevel(value)99 elif stat == Stats.SLS:100 return self._hook.SetSouls(value)101 elif stat == Stats.HUM:102 return self._hook.SetHumanity(value)103 def read_event_flag(self, flag_id: int):104 if not self.can_read() or not self.is_hooked():105 return None106 return self._hook.ReadEventFlag(flag_id)107 def write_event_flag(self, flag_id: int, state: bool):108 if not self._hook.WriteEventFlag(flag_id, state):109 raise WriteMemoryError()110 def listen_for_flag(self, flag_id, flag_state):111 return wait_for(lambda: self.read_event_flag(flag_id), desired_state=flag_state)112 def death_cam(self, enable: bool):113 if not self._hook.SetDeathCam(enable):114 raise WriteMemoryError()115 def set_super_armor(self, enable: bool):116 if not self._hook.SetPlayerSuperArmor(enable):117 raise WriteMemoryError()118 def set_no_gravity(self, enable: bool):119 if not self._hook.SetNoGravity(enable):120 raise WriteMemoryError()121 def set_no_dead(self, enable: bool):122 if not self._hook.SetPlayerNoDead(enable):123 raise WriteMemoryError()124 def set_no_update(self, enable: bool):125 if not self._hook.SetNoUpdate(enable):126 raise WriteMemoryError()127 def set_no_stamina_consume(self, enable: bool):128 if not self._hook.SetPlayerNoStamina(enable):129 raise WriteMemoryError()130 def set_no_goods_consume(self, enable: bool):131 if not self._hook.SetPlayerNoGoods(enable):132 raise WriteMemoryError()133 def set_no_damage(self, enable: bool):134 if not self._hook.SetPlayerDisableDamage(enable):135 raise WriteMemoryError()136 def set_no_hit(self, enable: bool):137 if not self._hook.SetPlayerNoHit(enable):138 raise WriteMemoryError()139 def get_player_dead_mode(self):140 if not self._hook.GetPlayerDeadMode():141 raise WriteMemoryError()142 def set_player_dead_mode(self, enable: bool):143 if not self._hook.SetPlayerDeadMode(enable):144 raise WriteMemoryError()145 def set_no_magic_all(self, enable: bool):146 if not self._hook.SetAllNoMagicQty(enable):147 raise WriteMemoryError()148 def set_no_stamina_all(self, enable: bool):149 if not self._hook.SetAllNoStamina(enable):150 raise WriteMemoryError()151 def set_exterminate(self, enable: bool):152 if not self._hook.SetPlayerExterminate(enable):153 raise WriteMemoryError()154 def set_no_ammo_consume_all(self, enable: bool):155 if not self._hook.SetAllNoArrow(enable):156 raise WriteMemoryError()157 def set_hide(self, enable: bool):158 if not self._hook.SetPlayerHide(enable):159 raise WriteMemoryError()160 def set_silence(self, enable: bool):161 if not self._hook.SetPlayerSilence(enable):162 raise WriteMemoryError()163 def set_no_dead_all(self, enable: bool):164 if not self._hook.SetAllNoDead(enable):165 raise WriteMemoryError()166 def set_no_damage_all(self, enable: bool):167 if not self._hook.SetAllNoDamage(enable):168 raise WriteMemoryError()169 def set_no_hit_all(self, enable: bool):170 if not self._hook.SetAllNoHit(enable):171 raise WriteMemoryError()172 def set_no_attack_all(self, enable: bool):173 if not self._hook.SetAllNoAttack(enable):174 raise WriteMemoryError()175 def set_no_move_all(self, enable: bool):176 if not self._hook.SetAllNoMove(enable):177 raise WriteMemoryError()178 def set_no_update_ai_all(self, enable: bool):179 if not self._hook.SetAllNoUpdateAI(enable):180 raise WriteMemoryError()181 def get_hp(self):182 if not self.is_hooked():183 raise ReadMemoryError()184 return self._hook.GetHealth()185 def set_hp(self, value: int):186 if not self._hook.SetHealthMax(value):187 raise WriteMemoryError()188 def get_hp_max(self):189 if not self.is_hooked():190 raise ReadMemoryError()191 return self._hook.GetHealthMax()192 def get_stamina(self):193 if not self.is_hooked():194 raise ReadMemoryError()195 return self._hook.GetStaminaMax()196 def set_stamina(self, value: int):197 if not self._hook.SetStaminaMax(value):198 raise WriteMemoryError()199 def get_pos(self):200 return (201 self._hook.GetPositionX(),202 self._hook.GetPositionY(),203 self._hook.GetPositionZ(),204 (self._hook.GetPositionAngle() + pi) / (pi * 2) * 360205 )206 def get_pos_stable(self):207 return (208 self._hook.GetStablePositionX(),209 self._hook.GetStablePositionY(),210 self._hook.GetStablePositionZ(),211 (self._hook.GetStablePositionAngle() + pi) / (pi * 2) * 360212 )213 def jump_pos(self, x, y, z, a):214 self._hook.PosWarp(x, y, z, a / 360 * 2 * pi - pi)215 def set_name(self, name: str):216 if not self._hook.SetName(name):217 raise WriteMemoryError()218 def set_team_type(self, value: int):219 if not self._hook.SetTeamType(value):220 raise WriteMemoryError()221 def set_phantom_type(self, value: int):222 if not self._hook.SetChrType(value):223 raise WriteMemoryError()224 def set_ng_mode(self, value: int):225 if not self._hook.SetNewGameType(value):226 raise WriteMemoryError()227 def get_last_animation(self):228 if not self.is_hooked():229 raise ReadMemoryError()230 if not self.can_read():231 return -1232 return self._hook.GetLastAnimation()233 def set_bonfire(self, value: int):234 if not self._hook.SetLastBonfire(value):235 raise WriteMemoryError()236 def bonfire_warp(self):237 code = self._hook.BonfireWarp()238 if code != 0:239 raise AsmExecuteError(code)240 def item_get(self, item_category: int, item_id: int, item_count: int):241 code = self._hook.GetItem(item_category, item_id, item_count)242 if code != 0:243 raise AsmExecuteError(code)244 def override_filter(self, override: bool):245 self._hook.SetFilterOverride(override)246 def set_brightness(self, red: float, green: float, blue: float):247 self._hook.SetBrightness(red, green, blue)248 def set_contrast(self, red: float, green: float, blue: float):249 self._hook.SetContrast(red, green, blue)250 def set_saturation(self, value: float):251 self._hook.SetSaturation(value)252 def set_hue(self, value: float):253 self._hook.SetHue(value)254 def draw_map(self, enable: bool):255 self._hook.SetDrawMap(enable)256 def draw_objects(self, enable: bool):257 self._hook.SetDrawObjects(enable)258 def draw_creatures(self, enable: bool):259 self._hook.SetDrawCharacters(enable)260 def draw_sfx(self, enable: bool):261 self._hook.SetDrawSFX(enable)262 def draw_cutscenes(self, enable: bool):263 self._hook.SetDrawCutscenes(enable)264 def disable_all_area_enemies(self, disable: bool):265 if not self._hook.DisableAllAreaEnemies(disable):266 raise WriteMemoryError()267 def disable_all_area_event(self, disable: bool):268 if not self._hook.DisableAllAreaEvent(disable):269 raise WriteMemoryError()270 def disable_all_area_map(self, disable: bool):271 if not self._hook.DisableAllAreaMap(disable):272 raise WriteMemoryError()273 def disable_all_area_obj(self, disable: bool):274 if not self._hook.DisableAllAreaObj(disable):275 raise WriteMemoryError()276 def enable_all_area_obj(self, enable: bool):277 if not self._hook.EnableAllAreaObj(enable):278 raise WriteMemoryError()279 def enable_all_area_obj_break(self, enable: bool):280 if not self._hook.EnableAllAreaObjBreak(enable):281 raise WriteMemoryError()282 def disable_all_area_hi_hit(self, disable: bool):283 if not self._hook.DisableAllAreaHiHit(disable):284 raise WriteMemoryError()285 def disable_all_area_lo_hit(self, disable: bool):286 if not self._hook.EnableAllAreaLoHit(not disable):287 raise WriteMemoryError()288 def disable_all_area_sfx(self, disable: bool):289 if not self._hook.DisableAllAreaSfx(disable):290 raise WriteMemoryError()291 def disable_all_area_sound(self, disable: bool):292 if not self._hook.DisableAllAreaSound(disable):293 raise WriteMemoryError()294 def enable_obj_break_record_mode(self, enable: bool):295 if not self._hook.EnableObjBreakRecordMode(enable):296 raise WriteMemoryError()297 def enable_auto_map_warp_mode(self, enable: bool):298 if not self._hook.EnableAutoMapWarpMode(enable):299 raise WriteMemoryError()300 def enable_chr_npc_wander_test(self, enable: bool):301 if not self._hook.EnableChrNpcWanderTest(enable):302 raise WriteMemoryError()303 def enable_dbg_chr_all_dead(self, enable: bool):304 if not self._hook.EnableDbgChrAllDead(enable):305 raise WriteMemoryError()306 def enable_online_mode(self, enable: bool):307 if not self._hook.EnableOnlineMode(enable):308 raise WriteMemoryError()309 def enable_enemy_control(self, enable: bool):...
chain.py
Source:chain.py
1import itertools2from typing import Dict3from sqlalchemy import Boolean, Column, PrimaryKeyConstraint, String, Table4from cloudbot import hook5from cloudbot.event import CommandEvent6from cloudbot.util import database7from cloudbot.util.formatting import chunk_str, pluralize_auto8commands = Table(9 "chain_commands",10 database.metadata,11 Column("hook", String),12 Column("allowed", Boolean, default=True),13 PrimaryKeyConstraint("hook", "allowed"),14)15allow_cache: Dict[str, bool] = {}16@hook.on_start()17def load_cache(db):18 new_cache = {}19 for row in db.execute(commands.select()):20 new_cache[row["hook"]] = row["allowed"]21 allow_cache.clear()22 allow_cache.update(new_cache)23def format_hook_name(_hook):24 return _hook.plugin.title + "." + _hook.function.__name__25def get_hook_from_command(bot, hook_name):26 manager = bot.plugin_manager27 if "." in hook_name:28 for _hook in manager.commands.values():29 if format_hook_name(_hook) == hook_name:30 return _hook31 return None32 try:33 _hook = manager.commands[hook_name]34 except LookupError:35 pass36 else:37 return _hook38 possible = []39 for name, _hook in manager.commands.items():40 if name.startswith(hook_name):41 possible.append(_hook)42 return possible[0] if len(possible) == 1 else None43@hook.command(permissions=["botcontrol", "snoonetstaff"])44def chainallow(text, db, notice_doc, bot):45 """{add [hook] [{allow|deny}]|del [hook]} - Manage the allowed list fo comands for the chain command"""46 args = text.split()47 subcmd = args.pop(0).lower()48 if not args:49 return notice_doc()50 name = args.pop(0)51 _hook = get_hook_from_command(bot, name)52 if _hook is None:53 return "Unable to find command '{}'".format(name)54 hook_name = format_hook_name(_hook)55 if subcmd == "add":56 values = {"hook": hook_name}57 if args:58 allow = args.pop(0).lower()59 if allow == "allow":60 allow = True61 elif allow == "deny":62 allow = False63 else:64 return notice_doc()65 values["allowed"] = allow66 updated = True67 res = db.execute(68 commands.update()69 .values(**values)70 .where(commands.c.hook == hook_name)71 )72 if res.rowcount == 0:73 updated = False74 db.execute(commands.insert().values(**values))75 db.commit()76 load_cache(db)77 if updated:78 return "Updated state of '{}' in chainallow to allowed={}".format(79 hook_name, allow_cache.get(hook_name)80 )81 if allow_cache.get(hook_name):82 return "Added '{}' as an allowed command".format(hook_name)83 return "Added '{}' as a denied command".format(hook_name)84 if subcmd == "del":85 res = db.execute(commands.delete().where(commands.c.hook == hook_name))86 db.commit()87 load_cache(db)88 return "Deleted {}.".format(pluralize_auto(res.rowcount, "row"))89 return notice_doc()90def parse_chain(text, bot):91 parts = text.split("|")92 cmds = []93 for part in parts:94 cmd, _, args = part.strip().partition(" ")95 _hook = get_hook_from_command(bot, cmd)96 cmds.append([cmd, _hook, args or ""])97 return cmds98def is_hook_allowed(_hook):99 name = format_hook_name(_hook)100 return bool(allow_cache.get(name))101def wrap_event(_hook, event, cmd, args):102 cmd_event = CommandEvent(103 base_event=event,104 text=args.strip(),105 triggered_command=cmd,106 hook=_hook,107 cmd_prefix="",108 )109 return cmd_event110@hook.command()111async def chain(text, bot, event):112 """<cmd1> [args...] | <cmd2> [args...] | ... - Runs commands in a chain, piping the output from previous commands113 to tne next"""114 cmds = parse_chain(text, bot)115 for name, _hook, _ in cmds:116 if _hook is None:117 return "Unable to find command '{}'".format(name)118 if not is_hook_allowed(_hook):119 event.notice(120 "'{}' may not be used in command piping".format(121 format_hook_name(_hook)122 )123 )124 return125 if _hook.permissions:126 allowed = await event.check_permissions(_hook.permissions)127 if not allowed:128 event.notice(129 "Sorry, you are not allowed to use '{}'.".format(130 format_hook_name(_hook)131 )132 )133 return134 buffer = ""135 out_func = None136 _target = None137 def message(msg, target=None):138 nonlocal buffer139 nonlocal out_func140 nonlocal _target141 buffer += (" " if buffer else "") + msg142 if out_func is None:143 out_func = event.message144 _target = target145 def reply(*messages, target=None):146 nonlocal buffer147 nonlocal out_func148 nonlocal _target149 buffer += (" " if buffer else "") + " ".join(messages)150 if out_func is None:151 out_func = event.reply152 _target = target153 def action(msg, target=None):154 nonlocal buffer155 nonlocal out_func156 nonlocal _target157 buffer += (" " if buffer else "") + msg158 if out_func is None:159 out_func = event.action160 _target = target161 while cmds:162 cmd, _hook, args = cmds.pop(0)163 args += (" " if args else "") + buffer164 buffer = ""165 cmd_event = wrap_event(_hook, event, cmd, args)166 cmd_event.message = message167 cmd_event.reply = reply168 cmd_event.action = action169 if _hook.auto_help and not cmd_event.text and _hook.doc is not None:170 cmd_event.notice_doc()171 return "Invalid syntax."172 ok, res = await bot.plugin_manager.internal_launch(_hook, cmd_event)173 if not ok:174 return "Error occurred."175 if res:176 if out_func is None:177 out_func = event.reply178 buffer += (" " if buffer else "") + res179 if buffer:180 if out_func is None:181 out_func = event.reply182 out_func(buffer, target=_target)183@hook.command(autohelp=False)184def chainlist(bot, event):185 """- Returns the list of commands allowed in 'chain'"""186 hooks = [187 get_hook_from_command(bot, name)188 for name, allowed in allow_cache.items()189 if allowed190 ]191 s = ", ".join(192 sorted(193 itertools.chain.from_iterable(194 h.aliases for h in hooks if h is not None195 )196 )197 )198 for part in chunk_str(s):...
test_base_hooks.py
Source:test_base_hooks.py
1import pytest2import time3import threading4from barython.panel import Panel5from barython.screen import Screen6from barython.widgets.base import Widget7from barython.hooks import HooksPool, _Hook8class TestHook(_Hook):9 def run(self, *args, **kwargs):10 return self.notify()11def test_base_hook_notify(mocker):12 """13 Test _Hook.notify14 """15 stub = mocker.stub()16 hook = _Hook(callbacks={stub, })17 hook.notify()18 stub.assert_called_once_with()19def test_base_hook_notify_without_callback(mocker):20 hook = _Hook()21 hook.notify()22def test_base_hooks_pool_subscribe(mocker):23 callback = mocker.stub()24 hp = HooksPool()25 hp.subscribe(callback, _Hook)26 assert isinstance(hp.hooks[_Hook][0], _Hook)27 assert hp.hooks[_Hook][0].callbacks == {callback, }28def test_base_hooks_pool_subscribe_listen(mocker):29 callback = mocker.stub()30 hp = HooksPool(listen=True)31 hp.subscribe(callback, TestHook)32 assert isinstance(hp.hooks[TestHook][0], TestHook)33 try:34 threading.Thread(target=hp.start).start()35 time.sleep(0.1)36 callback.assert_called_once_with()37 finally:38 hp.stop()39def test_base_hook_copy(mocker):40 callback0 = mocker.stub()41 h = _Hook(callbacks={callback0})42 h_cpy = h.copy()43 assert h.callbacks == h_cpy.callbacks44 assert h.callbacks is not h_cpy.callbacks45 assert h.daemon == h_cpy.daemon46def test_base_hooks_pool_merge(mocker):47 callback0, callback1 = mocker.stub(), mocker.stub()48 hp0, hp1 = HooksPool(), HooksPool()49 hp0.subscribe(callback0, _Hook)50 hp1.subscribe(callback1, _Hook)51 hp0.merge(hp1)52 assert hp0.hooks[_Hook][0].callbacks == {callback0, callback1}53def test_base_hooks_with_panel(mocker):54 callback0, callback1 = mocker.stub(), mocker.stub()55 p = Panel(keep_unplugged_screens=True)56 s = Screen()57 w0, w1 = Widget(), Widget()58 w0.hooks.subscribe(callback0, _Hook)59 w1.hooks.subscribe(callback1, _Hook)60 s.add_widget("l", w0, w1)61 p.add_screen(s)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!