Best JavaScript code snippet using cucumber-gherkin
02_threads.py
Source:02_threads.py
1# -*- coding: utf-8 -*-2# Ðак ÑоздаÑÑ Ð¸ запÑÑÑиÑÑ Ð¿Ð¾Ñок3import random4import time5from collections import defaultdict6from threading import Thread7FISH = (None, 'плоÑва', 'окÑнÑ', 'леÑ')8# ÐпÑеделим ÑÑнкÑиÑ, ÑмÑлиÑÑÑÑÑÑ ÑÑбалкÑ9def fishing(name, worms):10 catch = defaultdict(int)11 for worm in range(worms):12 print(f'{name}: ЧеÑвÑк â {worm} - ÐабÑоÑил, ждем...', flush=True)13 _ = 3 ** (random.randint(50, 70) * 10000)14 fish = random.choice(FISH)15 if fish is None:16 print(f'{name}: ТÑÑÑ, ÑожÑали ÑеÑвÑка...', flush=True)17 else:18 print(f'{name}: Ðга, Ñ Ð¼ÐµÐ½Ñ {fish}', flush=True)19 catch[fish] += 120 print(f'ÐÑого ÑÑбак {name} поймал:')21 for fish, count in catch.items():22 print(f' {fish} - {count}')23fishing(name='ÐаÑÑ', worms=10)24# Ð ÑепеÑÑ Ñоздадим вÑоÑого ÑÑбака, поÑедÑего на ÑÑÐ±Ð°Ð»ÐºÑ ÐÐÐÐÐÐ ÐÐÐÐÐÐ Ñ Ð¿ÐµÑвÑм25thread = Thread(target=fishing, kwargs=dict(name='ÐаÑÑ', worms=10))26thread.start()27fishing(name='ÐолÑ', worms=10)28thread.join()29# ÐÑи пÑоÑÑой пеÑедаÑе ÑÑнкÑии в поÑок ÑезÑлÑÑÐ°Ñ Ð²ÑÐ¿Ð¾Ð»Ð½ÐµÐ½Ð¸Ñ ÑÑнкÑии30# можно полÑÑиÑÑ ÑолÑко ÑеÑез изменÑемÑй обÑÐµÐºÑ Ð² паÑамеÑÑаÑ
:31def fishing(name, worms, catch):32 for worm in range(worms):33 print(f'{name}: ЧеÑвÑк â {worm} - ÐабÑоÑил, ждем...', flush=True)34 _ = 3 ** (random.randint(50, 70) * 10000)35 fish = random.choice(FISH)36 if fish is None:37 print(f'{name}: ТÑÑÑ, ÑожÑали ÑеÑвÑка...', flush=True)38 else:39 print(f'{name}: Ðга, Ñ Ð¼ÐµÐ½Ñ {fish}', flush=True)40 catch[fish] += 141vasya_catch = defaultdict(int)42thread = Thread(target=fishing, kwargs=dict(name='ÐаÑÑ', worms=10, catch=vasya_catch))43thread.start()44kolya_catch = defaultdict(int)45fishing(name='ÐолÑ', worms=10, catch=kolya_catch)46thread.join()47for name, catch in (('ÐаÑÑ', vasya_catch), ('ÐаÑÑ', kolya_catch)):48 print(f'ÐÑого ÑÑбак {name} поймал:')49 for fish, count in catch.items():50 print(f' {fish} - {count}')51# Ðолее Ñдобно иÑполÑзоваÑÑ Ð½Ð°Ñледование Ð¾Ñ ÐºÐ»Ð°ÑÑа поÑока52class Fisher(Thread):53 def __init__(self, name, worms, *args, **kwargs):54 super(Fisher, self).__init__(*args, **kwargs)55 self.name = name56 self.worms = worms57 def run(self):58 catch = defaultdict(int)59 for worm in range(self.worms):60 print(f'{self.name}: ЧеÑвÑк â {worm} - ÐабÑоÑил, ждем...', flush=True)61 _ = 3 ** (random.randint(50, 70) * 10000)62 fish = random.choice(FISH)63 if fish is None:64 print(f'{self.name}: ТÑÑÑ, ÑожÑали ÑеÑвÑка...', flush=True)65 else:66 print(f'{self.name}: Ðга, Ñ Ð¼ÐµÐ½Ñ {fish}', flush=True)67 catch[fish] += 168 print(f'ÐÑого ÑÑбак {self.name} поймал:')69 for fish, count in catch.items():70 print(f' {fish} - {count}')71vasya = Fisher(name='ÐаÑÑ', worms=10)72kolya = Fisher(name='ÐолÑ', worms=10)73print('.' * 20, 'Ðни поÑли на ÑÑбалкÑ')74vasya.start()75kolya.start()76print('.' * 20, 'Ðдем пока они веÑнÑÑÑÑ...')77vasya.join()78kolya.join()79print('.' * 20, 'ÐÑак, они веÑнÑлиÑÑ')80# ÐÑли нÑжен ÑезÑлÑÑÐ°Ñ Ð²ÑполнениÑ, Ñо пÑоÑÑо делаем аÑÑибÑÑ ÐºÐ»Ð°ÑÑа81class Fisher(Thread):82 def __init__(self, name, worms, *args, **kwargs):83 super().__init__(*args, **kwargs)84 self.name = name85 self.worms = worms86 self.catch = defaultdict(int)87 def run(self):88 self.catch = defaultdict(int)89 for worm in range(self.worms):90 print(f'{self.name}: ЧеÑвÑк â {worm} - ÐабÑоÑил, ждем...', flush=True)91 _ = 3 ** (random.randint(50, 70) * 10000)92 fish = random.choice(FISH)93 if fish is None:94 print(f'{self.name}: ТÑÑÑ, ÑожÑали ÑеÑвÑка...', flush=True)95 else:96 print(f'{self.name}: Ðга, Ñ Ð¼ÐµÐ½Ñ {fish}', flush=True)97 self.catch[fish] += 198vasya = Fisher(name='ÐаÑÑ', worms=10)99kolya = Fisher(name='ÐолÑ', worms=10)100print('.' * 20, 'Ðни поÑли на ÑÑбалкÑ')101vasya.start()102kolya.start()103print('.' * 20, 'Ðдем пока они веÑнÑÑÑÑ...')104vasya.join()105kolya.join()106print('.' * 20, 'ÐÑак, они веÑнÑлиÑÑ')107for fisher in (vasya, kolya):108 print(f'ÐÑого ÑÑбак {fisher.name} поймал:')109 for fish, count in fisher.catch.items():110 print(f' {fish} - {count}')111# ЧÑо бÑÐ´ÐµÑ ÐµÑли в поÑоке оÑибка112class Fisher(Thread):113 def __init__(self, name, worms, *args, **kwargs):114 super().__init__(*args, **kwargs)115 self.name = name116 self.worms = worms117 self.catch = defaultdict(int)118 def run(self):119 self.catch = defaultdict(int)120 for worm in range(self.worms):121 print(f'{self.name}: ЧеÑвÑк â {worm} - ÐабÑоÑил, ждем...', flush=True)122 _ = 3 ** (random.randint(50, 70) * 10000)123 dice = random.randint(1, 5)124 if self.name == 'ÐолÑ' and dice == 1:125 raise ValueError(f'{self.name}: Ðлин, Ñ Ð¼ÐµÐ½Ñ ÑломалаÑÑ ÑдоÑка на {worm} ÑеÑвÑке :(')126 fish = random.choice(FISH)127 if fish is None:128 print(f'{self.name}: ТÑÑÑ, ÑожÑали ÑеÑвÑка...', flush=True)129 else:130 print(f'{self.name}: Ðга, Ñ Ð¼ÐµÐ½Ñ {fish}', flush=True)131 self.catch[fish] += 1132vasya = Fisher(name='ÐаÑÑ', worms=10)133kolya = Fisher(name='ÐолÑ', worms=10)134print('.' * 20, 'Ðни поÑли на ÑÑбалкÑ')135vasya.start()136kolya.start()137print('.' * 20, 'Ðдем пока они веÑнÑÑÑÑ...')138vasya.join()139kolya.join()140print('.' * 20, 'ÐÑак, они веÑнÑлиÑÑ')141for fisher in (vasya, kolya):142 print(f'ÐÑого ÑÑбак {fisher.name} поймал:')143 for fish, count in fisher.catch.items():144 print(f' {fish} - {count}')145# ÐбÑабаÑÑваÑÑ Ð¾Ñибки нÑжно в Ñамом поÑоке146class Fisher(Thread):147 def __init__(self, name, worms, *args, **kwargs):148 super().__init__(*args, **kwargs)149 self.name = name150 self.worms = worms151 self.catch = defaultdict(int)152 def run(self):153 try:154 self._fishing()155 except Exception as exc:156 print(exc)157 def _fishing(self):158 self.catch = defaultdict(int)159 for worm in range(self.worms):160 print(f'{self.name}: ЧеÑвÑк â {worm} - ÐабÑоÑил, ждем...', flush=True)161 _ = 3 ** (random.randint(50, 70) * 10000)162 dice = random.randint(1, 5)163 if self.name == 'ÐолÑ' and dice == 1:164 raise ValueError(f'{self.name}: Ðлин, Ñ Ð¼ÐµÐ½Ñ ÑломалаÑÑ ÑдоÑка на {worm} ÑеÑвÑке :(')165 fish = random.choice(FISH)166 if fish is None:167 print(f'{self.name}: ТÑÑÑ, ÑожÑали ÑеÑвÑка...', flush=True)168 else:169 print(f'{self.name}: Ðга, Ñ Ð¼ÐµÐ½Ñ {fish}', flush=True)170 self.catch[fish] += 1171vasya = Fisher(name='ÐаÑÑ', worms=10)172kolya = Fisher(name='ÐолÑ', worms=10)173print('.' * 20, 'Ðни поÑли на ÑÑбалкÑ')174vasya.start()175kolya.start()176print('.' * 20, 'Ðдем пока они веÑнÑÑÑÑ...')177vasya.join()178kolya.join()179print('.' * 20, 'ÐÑак, они веÑнÑлиÑÑ')180for fisher in (vasya, kolya):181 print(f'ÐÑого ÑÑбак {fisher.name} поймал:')182 for fish, count in fisher.catch.items():183 print(f' {fish} - {count}')184# ÐÑоде вÑе пÑекÑаÑно, но в пайÑоне еÑÑÑ ÑÑÑовÑй GIL - Global Interpreter Lock - https://goo.gl/MTokAe185# GIL велик и ÑжаÑен - ÑÑо меÑ
анизм блокиÑовки вÑÐ¿Ð¾Ð»Ð½ÐµÐ½Ð¸Ñ Ð¿Ð¾Ñоков, пока один вÑполнÑеÑÑÑ.186# ЧÑо, поÑемÑ, заÑем, ÐаÑл?! ÐаÑем ÑÑÑелÑÑÑ Ñебе в ногÑ?187#188# ÐÑе дело в Ñом, ÑÑо Ñам пайÑон - пÑоÑеÑÑ Ð² опеÑаÑионной ÑиÑÑеме. Ð ÐС Ð¼Ð¾Ð¶ÐµÑ Ð¿ÑиоÑÑанавливаÑÑ Ð²Ñполнение189# Ñамого пÑоÑеÑÑа пайÑона. Ркогда пÑоиÑÑ
Ð¾Ð´Ð¸Ñ Ð´Ð²Ð¾Ð¹Ð½Ð¾Ðµ заÑÑпание/пÑоÑÑпание,190# Ñо Ð²Ð¾Ð·Ð½Ð¸ÐºÐ°ÐµÑ Ð¿Ñоблема доÑÑÑпа к одним и Ñем ÑÑаÑÑкам памÑÑи. ÐоÑÑÐ¾Ð¼Ñ ÑазÑабоÑÑики CPython,191# (а Ñ Ð½Ð°Ñ Ð¸Ð¼ÐµÐ½Ð½Ð¾ ÑÑа ÑеализаÑиÑ) ÑеÑили ÑделаÑÑ GIL.192#193# ÐоÑÑÐ¾Ð¼Ñ Ð½ÐµÐ»ÑÐ·Ñ Ð¿Ð¾Ð»ÑÑиÑÑ Ð²ÑÐ³Ð¾Ð´Ñ Ð² пÑоизводиÑелÑноÑÑи пÑогÑÐ°Ð¼Ð¼Ñ Ð² Ñ.н. CPU-bounded алгоÑиÑмаÑ
194# (ÑÑо Ñе, коÑоÑÑм ÑÑебÑеÑÑÑ Ð¿ÑоÑеÑÑоÑное вÑÐµÐ¼Ñ Ð¸ не нÑÐ¶Ð½Ñ Ð¾Ð¿ÐµÑаÑии ввода/вÑвода)195class Fisher(Thread):196 def __init__(self, name, worms, *args, **kwargs):197 super().__init__(*args, **kwargs)198 self.name = name199 self.worms = worms200 self.catch = defaultdict(int)201 def run(self):202 self.catch = defaultdict(int)203 for worm in range(self.worms):204 _ = worm ** 10000 # ÑикÑиÑÑем вÑÐµÐ¼Ñ Ð¾Ð¶Ð¸Ð´Ð°Ð½Ð¸Ñ Ð¿Ð¾ÐºÐ»ÐµÐ²ÐºÐ¸205 fish = random.choice(FISH)206 if fish is not None:207 self.catch[fish] += 1208def time_track(func):209 def surrogate(*args, **kwargs):210 started_at = time.time()211 result = func(*args, **kwargs)212 ended_at = time.time()213 elapsed = round(ended_at - started_at, 6)214 print(f'ФÑнкÑÐ¸Ñ {func.__name__} ÑабоÑала {elapsed} ÑекÑнд(Ñ)',)215 return result216 return surrogate217@time_track218def run_in_one_thread(fishers):219 for fisher in fishers:220 fisher.run()221@time_track222def run_in_threads(fishers):223 for fisher in fishers:224 fisher.start()225 for fisher in fishers:226 fisher.join()227humans = ['ÐаÑек', 'ÐолÑн', 'ÐеÑÑовиÑ', 'ХмÑÑÑй', 'Ðлава', ]228fishers = [Fisher(name=name, worms=100) for name in humans]229run_in_one_thread(fishers)230run_in_threads(fishers)231# ХоÑоÑÐ°Ñ Ð½Ð¾Ð²Ð¾ÑÑÑ Ð² Ñом, ÑÑо пайÑон оÑпÑÑÐºÐ°ÐµÑ GIL пеÑед ÑиÑÑемнÑми вÑзовами. ЧÑение из Ñайла, к пÑимеÑÑ,232# Ð¼Ð¾Ð¶ÐµÑ Ð·Ð°Ð½Ð¸Ð¼Ð°ÑÑ Ð´Ð»Ð¸ÑелÑное вÑÐµÐ¼Ñ Ð¸ ÑовеÑÑенно не ÑÑебÑÐµÑ GIL â можно даÑÑ ÑÐ°Ð½Ñ Ð´ÑÑгим поÑокам поÑабоÑаÑÑ.233# ...234# ÐÑоÑиÑ! Ñам где еÑÑÑ Ð¾Ð¿ÐµÑаÑии ввода/ввÑвода: ÑÑение Ñ Ð´Ð¸Ñка, обменд даннÑми по ÑеÑи, еÑÑ.235# ХоÑоÑÐ°Ñ Ð½Ð¾Ð²Ð¾ÑÑÑ: лÑÐ±Ð°Ñ Ð¿ÑогÑамма должна обмениваÑÑÑÑ Ð´Ð°Ð½Ð½Ñми Ñ Ð²Ð½ÐµÑним миÑом :)236class Fisher(Thread):237 def __init__(self, name, worms, *args, **kwargs):238 super().__init__(*args, **kwargs)239 self.name = name240 self.worms = worms241 self.catch = defaultdict(int)242 def run(self):243 self.catch = defaultdict(int)244 for worm in range(self.worms):245 time.sleep(0.01) # TODO ÑÑÑ Ð²Ñзов ÑиÑÑемной ÑÑнкÑии246 fish = random.choice(FISH)247 if fish is not None:248 self.catch[fish] += 1249def time_track(func):250 def surrogate(*args, **kwargs):251 started_at = time.time()252 result = func(*args, **kwargs)253 ended_at = time.time()254 elapsed = round(ended_at - started_at, 6)255 print(f'ФÑнкÑÐ¸Ñ {func.__name__} ÑабоÑала {elapsed} ÑекÑнд(Ñ)',)256 return result257 return surrogate258@time_track259def run_in_one_thread(fishers):260 for fisher in fishers:261 fisher.run()262@time_track263def run_in_threads(fishers):264 for fisher in fishers:265 fisher.start()266 for fisher in fishers:267 fisher.join()268humans = ['ÐаÑек', 'ÐолÑн', 'ÐеÑÑовиÑ', 'ХмÑÑÑй', 'Ðлава', ]269fishers = [Fisher(name=name, worms=100) for name in humans]270run_in_one_thread(fishers)271run_in_threads(fishers)272# ÐÑо обÑÑÑнение меÑ
анизма GIL оÑÐµÐ½Ñ ÑекомендÑÑ Ð¿Ð¾ÑмоÑÑеÑÑ Ð²Ð¸Ð´ÐµÐ¾ Ñ Moscow Python Meetup:273# ÐÑигоÑий ÐеÑÑов. "GIL в Python: заÑем он нÑжен и как Ñ ÑÑим жиÑÑ"274# http://www.moscowpython.ru/meetup/14/gil-and-python-why/275###276# Ðзвне завеÑÑиÑÑ Ð¿Ð¾Ñок невозможно ÑÑаÑнÑми ÑÑедÑÑвами пайÑона.277# Ð ÑÑо пÑавилÑно, Ð²Ð¾Ñ Ð´Ð²Ðµ оÑновнÑе пÑÐ¾Ð±Ð»ÐµÐ¼Ñ Ñ "ÑбийÑÑвом" поÑока:278# - поÑок Ð¼Ð¾Ð¶ÐµÑ Ð°ÐºÑивно ÑабоÑаÑÑ Ñ Ð´Ð°Ð½Ð½Ñми в ÑÑÐ¾Ñ Ð¼Ð¾Ð¼ÐµÐ½Ñ Ð¸ пÑинÑдиÑелÑное завеÑÑениие ÑазÑÑÑÐ¸Ñ Ð¸Ñ
ÑелоÑÑноÑÑÑ.279# - поÑок Ð¼Ð¾Ð¶ÐµÑ Ð¿Ð¾ÑодиÑÑ Ð´ÑÑгие поÑоки - иÑ
Ñоже завеÑÑаÑÑ?280# ЧÑо делаÑÑ? Ðожно добавиÑÑ Ð¿Ñизнак вÑÑ
ода:281class Fisher(Thread):282 def __init__(self, name, worms, *args, **kwargs):283 super().__init__(*args, **kwargs)284 self.name = name285 self.worms = worms286 self.catch = defaultdict(int)287 # бÑдем пÑовеÑÑÑÑ Ð² Ñикле - а не поÑа ли нам заканÑиваÑÑ?288 self.need_stop = False289 def run(self):290 self.catch = defaultdict(int)291 for worm in range(self.worms):292 print(f'{self.name}: ЧеÑвÑк â {worm} - ÐабÑоÑил, ждем...', flush=True)293 _ = 3 ** (random.randint(50, 70) * 10000)294 fish = random.choice(FISH)295 if fish is None:296 print(f'{self.name}: ТÑÑÑ, ÑожÑали ÑеÑвÑка...', flush=True)297 else:298 print(f'{self.name}: Ðга, Ñ Ð¼ÐµÐ½Ñ {fish}', flush=True)299 self.catch[fish] += 1300 if self.need_stop:301 print(f'{self.name}: Ðй, жена ÑжинаÑÑ Ð·Ð¾Ð²ÐµÑ! СмаÑÑваем ÑдоÑки...', flush=True)302 break303vasya = Fisher(name='ÐаÑÑ', worms=100)304vasya.start()305time.sleep(1)306if vasya.is_alive(): # кÑÑаÑи Ñ Ð¿Ð¾Ð¼Ð¾ÑÑÑ ÑÑого меÑода можно пÑовеÑиÑÑ Ð²ÑполнÑеÑÑÑ Ð»Ð¸ еÑе поÑок?307 vasya.need_stop = True308vasya.join() # ожидание завеÑÑÐµÐ½Ð¸Ñ Ð¾Ð±ÑзаÑелÑно - поÑок Ð¼Ð¾Ð¶ÐµÑ Ð½ÐµÐºÐ¾ÑоÑое вÑÐµÐ¼Ñ ÑинализиÑоваÑÑ ÑабоÑÑ309# ÐÐ¾Ð´Ð²Ð¾Ð´Ñ Ð¸Ñог, нÑжно ÑказаÑÑ, ÑÑо в мÑлÑÑипоÑоÑном пÑогÑаммиÑованиии наÑи линейнÑй код310# пеÑеÑÑаÑÑ Ð±ÑÑÑ Ð»Ð¸Ð½ÐµÐ¹Ð½Ñм (СÐÐХРÐÐÐЫÐ). ÐÑли Ñанее Ð¼Ñ Ð±Ñли ÑвеÑÐµÐ½Ñ Ð² поÑледоваÑелÑноÑÑи вÑÐ¿Ð¾Ð»Ð½ÐµÐ½Ð¸Ñ ÐºÐ¾Ð´Ð°,311# Ñо пÑи иÑполÑзовании поÑоков (пÑоÑеÑÑов) код Ð¼Ð¾Ð¶ÐµÑ Ð²ÑполнÑÑÑÑÑ ÐСÐÐХРÐÐÐÐ: нелÑÐ·Ñ Ð³Ð°ÑанÑиÑоваÑÑ312# ÑÑо за ÑÑим блоком кода бÑÐ´ÐµÑ Ð²ÑполнÑÑÑÑÑ Ð²Ð¾Ñ ÑÑоÑ....
test_virt_drivers.py
Source:test_virt_drivers.py
1# vim: tabstop=4 shiftwidth=4 softtabstop=42#3# Copyright 2010 X7 LLC4#5# Licensed under the Apache License, Version 2.0 (the "License"); you may6# not use this file except in compliance with the License. You may obtain7# a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the14# License for the specific language governing permissions and limitations15# under the License.16import base6417import netaddr18import sys19import traceback20from engine import exception21from engine import flags22from engine import image23from engine import log as logging24from engine import test25from engine.tests import utils as test_utils26libvirt = None27FLAGS = flags.FLAGS28LOG = logging.getLogger('engine.tests.test_virt_drivers')29def catch_notimplementederror(f):30 """Decorator to simplify catching drivers raising NotImplementedError31 If a particular call makes a driver raise NotImplementedError, we32 log it so that we can extract this information afterwards to33 automatically generate a hypervisor/feature support matrix."""34 def wrapped_func(self, *args, **kwargs):35 try:36 return f(self, *args, **kwargs)37 except NotImplementedError:38 frame = traceback.extract_tb(sys.exc_info()[2])[-1]39 LOG.error('%(driver)s does not implement %(method)s' % {40 'driver': type(self.connection),41 'method': frame[2]})42 wrapped_func.__name__ = f.__name__43 wrapped_func.__doc__ = f.__doc__44 return wrapped_func45class _VirtDriverTestCase(test.TestCase):46 def setUp(self):47 super(_VirtDriverTestCase, self).setUp()48 self.connection = self.driver_module.get_connection('')49 self.ctxt = test_utils.get_test_admin_context()50 self.image_service = image.get_default_image_service()51 def _get_running_instance(self):52 instance_ref = test_utils.get_test_instance()53 network_info = test_utils.get_test_network_info()54 image_info = test_utils.get_test_image_info(None, instance_ref)55 self.connection.spawn(self.ctxt, instance=instance_ref,56 image_meta=image_info,57 network_info=network_info)58 return instance_ref, network_info59 @catch_notimplementederror60 def test_init_host(self):61 self.connection.init_host('myhostname')62 @catch_notimplementederror63 def test_list_instances(self):64 self.connection.list_instances()65 @catch_notimplementederror66 def test_list_instances_detail(self):67 self.connection.list_instances_detail()68 @catch_notimplementederror69 def test_spawn(self):70 instance_ref, network_info = self._get_running_instance()71 domains = self.connection.list_instances()72 self.assertIn(instance_ref['name'], domains)73 domains_details = self.connection.list_instances_detail()74 self.assertIn(instance_ref['name'], [i.name for i in domains_details])75 @catch_notimplementederror76 def test_snapshot_not_running(self):77 instance_ref = test_utils.get_test_instance()78 img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'})79 self.assertRaises(exception.InstanceNotRunning,80 self.connection.snapshot,81 self.ctxt, instance_ref, img_ref['id'])82 @catch_notimplementederror83 def test_snapshot_running(self):84 img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'})85 instance_ref, network_info = self._get_running_instance()86 self.connection.snapshot(self.ctxt, instance_ref, img_ref['id'])87 @catch_notimplementederror88 def test_reboot(self):89 reboot_type = "SOFT"90 instance_ref, network_info = self._get_running_instance()91 self.connection.reboot(instance_ref, network_info, reboot_type)92 @catch_notimplementederror93 def test_get_host_ip_addr(self):94 host_ip = self.connection.get_host_ip_addr()95 # Will raise an exception if it's not a valid IP at all96 ip = netaddr.IPAddress(host_ip)97 # For now, assume IPv4.98 self.assertEquals(ip.version, 4)99 @catch_notimplementederror100 def test_resize_running(self):101 instance_ref, network_info = self._get_running_instance()102 self.connection.resize(instance_ref, 7)103 @catch_notimplementederror104 def test_set_admin_password(self):105 instance_ref, network_info = self._get_running_instance()106 self.connection.set_admin_password(instance_ref, 'p4ssw0rd')107 @catch_notimplementederror108 def test_inject_file(self):109 instance_ref, network_info = self._get_running_instance()110 self.connection.inject_file(instance_ref,111 base64.b64encode('/testfile'),112 base64.b64encode('testcontents'))113 @catch_notimplementederror114 def test_agent_update(self):115 instance_ref, network_info = self._get_running_instance()116 self.connection.agent_update(instance_ref, 'http://www.x7.org/',117 'd41d8cd98f00b204e9800998ecf8427e')118 @catch_notimplementederror119 def test_rescue(self):120 instance_ref, network_info = self._get_running_instance()121 self.connection.rescue(self.ctxt, instance_ref, network_info, None)122 @catch_notimplementederror123 def test_unrescue_unrescued_instance(self):124 instance_ref, network_info = self._get_running_instance()125 self.connection.unrescue(instance_ref, network_info)126 @catch_notimplementederror127 def test_unrescue_rescued_instance(self):128 instance_ref, network_info = self._get_running_instance()129 self.connection.rescue(self.ctxt, instance_ref, network_info, None)130 self.connection.unrescue(instance_ref, network_info)131 @catch_notimplementederror132 def test_poll_rebooting_instances(self):133 self.connection.poll_rebooting_instances(10)134 @catch_notimplementederror135 def test_poll_rescued_instances(self):136 self.connection.poll_rescued_instances(10)137 @catch_notimplementederror138 def test_poll_unconfirmed_resizes(self):139 self.connection.poll_unconfirmed_resizes(10)140 @catch_notimplementederror141 def test_migrate_disk_and_power_off(self):142 instance_ref, network_info = self._get_running_instance()143 instance_type_ref = test_utils.get_test_instance_type()144 self.connection.migrate_disk_and_power_off(145 self.ctxt, instance_ref, 'dest_host', instance_type_ref)146 @catch_notimplementederror147 def test_pause(self):148 instance_ref, network_info = self._get_running_instance()149 self.connection.pause(instance_ref)150 @catch_notimplementederror151 def test_unpause_unpaused_instance(self):152 instance_ref, network_info = self._get_running_instance()153 self.connection.unpause(instance_ref)154 @catch_notimplementederror155 def test_unpause_paused_instance(self):156 instance_ref, network_info = self._get_running_instance()157 self.connection.pause(instance_ref)158 self.connection.unpause(instance_ref)159 @catch_notimplementederror160 def test_suspend(self):161 instance_ref, network_info = self._get_running_instance()162 self.connection.suspend(instance_ref)163 @catch_notimplementederror164 def test_resume_unsuspended_instance(self):165 instance_ref, network_info = self._get_running_instance()166 self.connection.resume(instance_ref)167 @catch_notimplementederror168 def test_resume_suspended_instance(self):169 instance_ref, network_info = self._get_running_instance()170 self.connection.suspend(instance_ref)171 self.connection.resume(instance_ref)172 @catch_notimplementederror173 def test_destroy_instance_nonexistant(self):174 fake_instance = {'id': 42, 'name': 'I just made this up!'}175 network_info = test_utils.get_test_network_info()176 self.connection.destroy(fake_instance, network_info)177 @catch_notimplementederror178 def test_destroy_instance(self):179 instance_ref, network_info = self._get_running_instance()180 self.assertIn(instance_ref['name'],181 self.connection.list_instances())182 self.connection.destroy(instance_ref, network_info)183 self.assertNotIn(instance_ref['name'],184 self.connection.list_instances())185 @catch_notimplementederror186 def test_attach_detach_volume(self):187 instance_ref, network_info = self._get_running_instance()188 self.connection.attach_volume({'driver_volume_type': 'fake'},189 instance_ref['name'],190 '/mnt/engine/something')191 self.connection.detach_volume({'driver_volume_type': 'fake'},192 instance_ref['name'],193 '/mnt/engine/something')194 @catch_notimplementederror195 def test_get_info(self):196 instance_ref, network_info = self._get_running_instance()197 info = self.connection.get_info(instance_ref['name'])198 self.assertIn('state', info)199 self.assertIn('max_mem', info)200 self.assertIn('mem', info)201 self.assertIn('num_cpu', info)202 self.assertIn('cpu_time', info)203 @catch_notimplementederror204 def test_get_info_for_unknown_instance(self):205 self.assertRaises(exception.NotFound,206 self.connection.get_info, 'I just made this name up')207 @catch_notimplementederror208 def test_get_diagnostics(self):209 instance_ref, network_info = self._get_running_instance()210 self.connection.get_diagnostics(instance_ref['name'])211 @catch_notimplementederror212 def test_list_disks(self):213 instance_ref, network_info = self._get_running_instance()214 self.connection.list_disks(instance_ref['name'])215 @catch_notimplementederror216 def test_list_interfaces(self):217 instance_ref, network_info = self._get_running_instance()218 self.connection.list_interfaces(instance_ref['name'])219 @catch_notimplementederror220 def test_block_stats(self):221 instance_ref, network_info = self._get_running_instance()222 stats = self.connection.block_stats(instance_ref['name'], 'someid')223 self.assertEquals(len(stats), 5)224 @catch_notimplementederror225 def test_interface_stats(self):226 instance_ref, network_info = self._get_running_instance()227 stats = self.connection.interface_stats(instance_ref['name'], 'someid')228 self.assertEquals(len(stats), 8)229 @catch_notimplementederror230 def test_get_console_output(self):231 instance_ref, network_info = self._get_running_instance()232 console_output = self.connection.get_console_output(instance_ref)233 self.assertTrue(isinstance(console_output, basestring))234 @catch_notimplementederror235 def test_get_ajax_console(self):236 instance_ref, network_info = self._get_running_instance()237 ajax_console = self.connection.get_ajax_console(instance_ref)238 self.assertIn('token', ajax_console)239 self.assertIn('host', ajax_console)240 self.assertIn('port', ajax_console)241 @catch_notimplementederror242 def test_get_vnc_console(self):243 instance_ref, network_info = self._get_running_instance()244 vnc_console = self.connection.get_vnc_console(instance_ref)245 self.assertIn('token', vnc_console)246 self.assertIn('host', vnc_console)247 self.assertIn('port', vnc_console)248 @catch_notimplementederror249 def test_get_console_pool_info(self):250 instance_ref, network_info = self._get_running_instance()251 console_pool = self.connection.get_console_pool_info(instance_ref)252 self.assertIn('address', console_pool)253 self.assertIn('username', console_pool)254 self.assertIn('password', console_pool)255 @catch_notimplementederror256 def test_refresh_security_group_rules(self):257 # FIXME: Create security group and add the instance to it258 instance_ref, network_info = self._get_running_instance()259 self.connection.refresh_security_group_rules(1)260 @catch_notimplementederror261 def test_refresh_security_group_members(self):262 # FIXME: Create security group and add the instance to it263 instance_ref, network_info = self._get_running_instance()264 self.connection.refresh_security_group_members(1)265 @catch_notimplementederror266 def test_refresh_provider_fw_rules(self):267 instance_ref, network_info = self._get_running_instance()268 self.connection.refresh_provider_fw_rules()269 @catch_notimplementederror270 def test_update_available_resource(self):271 self.compute = self.start_service('compute', host='dummy')272 self.connection.update_available_resource(self.ctxt, 'dummy')273 @catch_notimplementederror274 def test_compare_cpu(self):275 cpu_info = '''{ "topology": {276 "sockets": 1,277 "cores": 2,278 "threads": 1 },279 "features": [280 "xtpr",281 "tm2",282 "est",283 "vmx",284 "ds_cpl",285 "monitor",286 "pbe",287 "tm",288 "ht",289 "ss",290 "acpi",291 "ds",292 "vme"],293 "arch": "x86_64",294 "model": "Penryn",295 "vendor": "Intel" }'''296 self.connection.compare_cpu(cpu_info)297 @catch_notimplementederror298 def test_ensure_filtering_for_instance(self):299 instance_ref = test_utils.get_test_instance()300 network_info = test_utils.get_test_network_info()301 self.connection.ensure_filtering_rules_for_instance(instance_ref,302 network_info)303 @catch_notimplementederror304 def test_unfilter_instance(self):305 instance_ref = test_utils.get_test_instance()306 network_info = test_utils.get_test_network_info()307 self.connection.unfilter_instance(instance_ref, network_info)308 @catch_notimplementederror309 def test_live_migration(self):310 instance_ref, network_info = self._get_running_instance()311 self.connection.live_migration(self.ctxt, instance_ref, 'otherhost',312 None, None)313 @catch_notimplementederror314 def _check_host_status_fields(self, host_status):315 self.assertIn('disk_total', host_status)316 self.assertIn('disk_used', host_status)317 self.assertIn('host_memory_total', host_status)318 self.assertIn('host_memory_free', host_status)319 @catch_notimplementederror320 def test_update_host_status(self):321 host_status = self.connection.update_host_status()322 self._check_host_status_fields(host_status)323 @catch_notimplementederror324 def test_get_host_stats(self):325 host_status = self.connection.get_host_stats()326 self._check_host_status_fields(host_status)327 @catch_notimplementederror328 def test_set_host_enabled(self):329 self.connection.set_host_enabled('a useless argument?', True)330 @catch_notimplementederror331 def test_host_power_action_reboot(self):332 self.connection.host_power_action('a useless argument?', 'reboot')333 @catch_notimplementederror334 def test_host_power_action_shutdown(self):335 self.connection.host_power_action('a useless argument?', 'shutdown')336 @catch_notimplementederror337 def test_host_power_action_startup(self):338 self.connection.host_power_action('a useless argument?', 'startup')339class AbstractDriverTestCase(_VirtDriverTestCase):340 def setUp(self):341 import engine.virt.driver342 self.driver_module = engine.virt.driver343 def get_driver_connection(_):344 return engine.virt.driver.ComputeDriver()345 self.driver_module.get_connection = get_driver_connection346 super(AbstractDriverTestCase, self).setUp()347class FakeConnectionTestCase(_VirtDriverTestCase):348 def setUp(self):349 import engine.virt.fake350 self.driver_module = engine.virt.fake351 super(FakeConnectionTestCase, self).setUp()352class LibvirtConnTestCase(_VirtDriverTestCase):353 def setUp(self):354 # Put fakelibvirt in place355 if 'libvirt' in sys.modules:356 self.saved_libvirt = sys.modules['libvirt']357 else:358 self.saved_libvirt = None359 import fakelibvirt360 import fake_libvirt_utils361 sys.modules['libvirt'] = fakelibvirt362 import engine.virt.libvirt.connection363 import engine.virt.libvirt.firewall364 engine.virt.libvirt.connection.libvirt = fakelibvirt365 engine.virt.libvirt.connection.libvirt_utils = fake_libvirt_utils366 engine.virt.libvirt.firewall.libvirt = fakelibvirt367 # Point _VirtDriverTestCase at the right module368 self.driver_module = engine.virt.libvirt.connection369 super(LibvirtConnTestCase, self).setUp()370 FLAGS.rescue_image_id = "2"371 FLAGS.rescue_kernel_id = "3"372 FLAGS.rescue_ramdisk_id = None373 def tearDown(self):374 super(LibvirtConnTestCase, self).setUp()375 # Restore libvirt376 import engine.virt.libvirt.connection377 import engine.virt.libvirt.firewall378 if self.saved_libvirt:379 sys.modules['libvirt'] = self.saved_libvirt380 engine.virt.libvirt.connection.libvirt = self.saved_libvirt381 engine.virt.libvirt.connection.libvirt_utils = self.saved_libvirt...
common_catch_and_log.py
Source:common_catch_and_log.py
1# Copyright 2014 Google Inc. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS-IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14"""Unit tests for logger."""15__author__ = 'Mike Gainer (mgainer@google.com)'16import datetime17import unittest18import appengine_config19from common import catch_and_log20class CatchAndLogTests(unittest.TestCase):21 def setUp(self):22 appengine_config.PRODUCTION_MODE = False23 self._catch_and_log = catch_and_log.CatchAndLog()24 self._expected = []25 def test_simple(self):26 complaint = 'No cheese, Gromit!'27 self._catch_and_log.critical(complaint)28 self._expect(complaint, catch_and_log._CRITICAL)29 self._assert_logs_match()30 def test_multiple(self):31 complaints = [32 'Failed to find cheese in pantry',33 'Failed to install cheese to mousetrap',34 'Failed to arm trap',35 'Failed to catch mouse'36 ]37 for complaint in complaints:38 self._catch_and_log.critical(complaint)39 self._expect(complaint, catch_and_log._CRITICAL)40 self._assert_logs_match()41 def test_multiple_levels(self):42 complaint = 'No cheese, Gromit!'43 self._catch_and_log.critical(complaint)44 self._expect(complaint, catch_and_log._CRITICAL)45 complaint = 'Moon out of range!'46 self._catch_and_log.warn(complaint)47 self._expect(complaint, catch_and_log._WARNING)48 complaint = 'Parking brake engaged!'49 self._catch_and_log.warning(complaint)50 self._expect(complaint, catch_and_log._WARNING)51 complaint = 'Five mice spectating'52 self._catch_and_log.info(complaint)53 self._expect(complaint, catch_and_log._INFO)54 complaint = 'Red light blinking'55 self._catch_and_log.info(complaint)56 self._expect(complaint, catch_and_log._INFO)57 complaint = 'Low blinker fluid'58 self._catch_and_log.warning(complaint)59 self._expect(complaint, catch_and_log._WARNING)60 complaint = 'Insufficient fuel for landing!'61 self._catch_and_log.critical(complaint)62 self._expect(complaint, catch_and_log._CRITICAL)63 self._assert_logs_match()64 def test_exception_suppressed(self):65 topic = 'Entering Orbit'66 complaint = 'Perigee below surface of Moon!'67 with self._catch_and_log.consume_exceptions(topic):68 raise ValueError(complaint)69 self._expect(70 '%s: ValueError: %s at ' % (topic, complaint) +71 'File "/tests/unit/common_catch_and_log.py", line 86, in '72 'test_exception_suppressed\n raise ValueError(complaint)\n',73 catch_and_log._CRITICAL)74 self._assert_logs_match()75 def test_exception_propagates(self):76 topic = 'Entering Orbit'77 complaint = 'Perigee below surface of Moon!'78 with self.assertRaises(ValueError):79 with self._catch_and_log.propagate_exceptions(topic):80 raise ValueError(complaint)81 self._expect(82 '%s: ValueError: %s at ' % (topic, complaint) +83 'File "/tests/unit/common_catch_and_log.py", line 100, in '84 'test_exception_propagates\n raise ValueError(complaint)\n',85 catch_and_log._CRITICAL)86 self._assert_logs_match()87 def test_traceback_info_suppressed_in_production(self):88 appengine_config.PRODUCTION_MODE = True89 topic = 'Entering Orbit'90 complaint = 'Perigee below surface of Moon!'91 with self._catch_and_log.consume_exceptions(topic):92 raise ValueError(complaint)93 self._expect('%s: ValueError: %s' % (topic, complaint),94 catch_and_log._CRITICAL)95 self._assert_logs_match()96 def _expect(self, message, level):97 self._expected.append({98 'message': message,99 'level': level,100 'timestamp': datetime.datetime.now().strftime(101 catch_and_log._LOG_DATE_FORMAT)})102 def _assert_logs_match(self):103 if len(self._expected) != len(self._catch_and_log.get()):104 self.fail('Expected %d entries, but have %d' %105 (len(self._expected), len(self._catch_and_log.get())))106 for expected, actual in zip(self._expected, self._catch_and_log.get()):107 self.assertEquals(expected['level'], actual['level'])108 self.assertEquals(expected['message'], actual['message'])109 expected_time = datetime.datetime.strptime(110 expected['timestamp'], catch_and_log._LOG_DATE_FORMAT)111 actual_time = datetime.datetime.strptime(112 actual['timestamp'], catch_and_log._LOG_DATE_FORMAT)113 self.assertAlmostEqual(...
__init__.py
Source:__init__.py
1# coding=utf-82from __future__ import unicode_literals3from .. import Provider as CompanyProvider4class Provider(CompanyProvider):5 formats = (6 '{{last_name}} {{company_suffix}}',7 '{{last_name}} {{last_name}} {{company_suffix}}',8 '{{last_name}}',9 '{{last_name}}',10 )11 catch_phrase_formats = (12 '{{catch_phrase_noun}} {{catch_phrase_verb}} {{catch_phrase_attribute}}', )13 nouns = (14 'la sécurité',15 'le plaisir',16 'le confort',17 'la simplicité',18 "l'assurance",19 "l'art",20 'le pouvoir',21 'le droit',22 'la possibilité',23 "l'avantage",24 'la liberté')25 verbs = (26 'de rouler',27 "d'avancer",28 "d'évoluer",29 'de changer',30 "d'innover",31 'de louer',32 "d'atteindre vos buts",33 'de concrétiser vos projets')34 attributes = (35 'de manière efficace',36 'plus rapidement',37 'plus facilement',38 'plus simplement',39 'en toute tranquilité',40 'avant-tout',41 'autrement',42 'naturellement',43 'à la pointe',44 'sans soucis',45 "à l'état pur",46 'à sa source',47 'de manière sûre',48 'en toute sécurité')49 company_suffixes = ('SA', 'S.A.', 'SARL', 'S.A.R.L.', 'S.A.S.', 'et Fils')50 siren_format = "### ### ###"51 def catch_phrase_noun(self):52 """53 Returns a random catch phrase noun.54 """55 return self.random_element(self.nouns)56 def catch_phrase_attribute(self):57 """58 Returns a random catch phrase attribute.59 """60 return self.random_element(self.attributes)61 def catch_phrase_verb(self):62 """63 Returns a random catch phrase verb.64 """65 return self.random_element(self.verbs)66 def catch_phrase(self):67 """68 :example 'integrate extensible convergence'69 """70 catch_phrase = ""71 while True:72 pattern = self.random_element(self.catch_phrase_formats)73 catch_phrase = self.generator.parse(pattern)74 catch_phrase = catch_phrase[0].upper() + catch_phrase[1:]75 if self._is_catch_phrase_valid(catch_phrase):76 break77 return catch_phrase78 # An array containing string which should not appear twice in a catch phrase79 words_which_should_not_appear_twice = ('sécurité', 'simpl')80 def _is_catch_phrase_valid(self, catch_phrase):81 """82 Validates a french catch phrase.83 :param catch_phrase: The catch phrase to validate.84 """85 for word in self.words_which_should_not_appear_twice:86 # Fastest way to check if a piece of word does not appear twice.87 begin_pos = catch_phrase.find(word)88 end_pos = catch_phrase.find(word, begin_pos + 1)89 if begin_pos != -1 and begin_pos != end_pos:90 return False91 return True92 def siren(self):93 """94 Generates a siren number (9 digits).95 """96 return self.numerify(self.siren_format)97 def siret(self, max_sequential_digits=2):98 """99 Generates a siret number (14 digits).100 It is in fact the result of the concatenation of a siren number (9 digits),101 a sequential number (4 digits) and a control number (1 digit) concatenation.102 If $max_sequential_digits is invalid, it is set to 2.103 :param max_sequential_digits The maximum number of digits for the sequential number (> 0 && <= 4).104 """105 if max_sequential_digits > 4 or max_sequential_digits <= 0:106 max_sequential_digits = 2107 sequential_number = str(self.random_number(108 max_sequential_digits)).zfill(4)...
test_helpers.py
Source:test_helpers.py
1#!/usr/bin/env python2# This Source Code Form is subject to the terms of the Mozilla Public3# License, v. 2.0. If a copy of the MPL was not distributed with this file,4# You can obtain one at http://mozilla.org/MPL/2.0/.5import sys6import unittest7import os8from lib.helpers import *9from mock import call, MagicMock10dirname = os.path.dirname(os.path.realpath(__file__))11sys.path.append(os.path.join(dirname, '..'))12class RetryFunc():13 def __init__(self):14 self.ran = 015 self.calls = []16 self.err = UserWarning17 def succeed(self, count):18 self.ran = self.ran + 119 self.calls.append(count)20 def fail(self, count):21 self.ran = self.ran + 122 self.calls.append(count)23 raise self.err24class TestRetryFunc(unittest.TestCase):25 def setUp(self):26 self.retry_func = RetryFunc()27 self.catch_func = RetryFunc()28 self.example_method = MagicMock()29 self.throw_exception = False30 def run_logic(self):31 self.example_method('run_logic1')32 if self.throw_exception:33 self.throw_exception = False34 raise UserWarning('error')35 self.example_method('run_logic2')36 def handler_logic(self):37 self.example_method('handler_logic')38 def exception_caught(self):39 self.example_method('exception_handler1')40 retry_func(41 lambda ran: self.handler_logic(),42 catch_func=lambda ran: self.exception_caught(),43 catch=UserWarning, retries=344 )45 self.example_method('exception_handler2')46 def test_passes_retry_count(self):47 self.assertRaises(48 self.retry_func.err,49 retry_func,50 self.retry_func.fail,51 catch=UserWarning, retries=352 )53 self.assertEqual(self.retry_func.calls, [0, 1, 2, 3])54 def test_retries_on_fail(self):55 self.assertRaises(56 self.retry_func.err,57 retry_func,58 self.retry_func.fail,59 catch=UserWarning, retries=360 )61 self.assertEqual(self.retry_func.ran, 4)62 def test_run_catch_func_on_fail(self):63 self.assertRaises(64 self.retry_func.err,65 retry_func,66 self.retry_func.fail,67 catch_func=self.catch_func.succeed,68 catch=UserWarning, retries=369 )70 self.assertEqual(self.catch_func.ran, 4)71 def test_no_retry_on_success(self):72 retry_func(73 self.retry_func.succeed,74 catch=UserWarning, retries=375 )76 self.assertEqual(self.retry_func.ran, 1)77 def test_no_run_catch_func_on_success(self):78 retry_func(79 self.retry_func.succeed,80 catch_func=self.catch_func.succeed,81 catch=UserWarning, retries=382 )83 self.assertEqual(self.catch_func.ran, 0)84 def test_runs_logic_and_handler_in_proper_order(self):85 self.throw_exception = True86 retry_func(87 lambda ran: self.run_logic(),88 catch_func=lambda ran: self.exception_caught(),89 catch=UserWarning, retries=190 )91 expected = [call('run_logic1'),92 call('exception_handler1'),93 call('handler_logic'),94 call('exception_handler2'),95 call('run_logic1'),96 call('run_logic2')]97 self.assertEqual(expected, self.example_method.mock_calls)98if __name__ == '__main__':...
Motion.py
Source:Motion.py
1from direct.fsm import StateData2from toontown.toonbase import ToontownGlobals3from direct.directnotify import DirectNotifyGlobal4from direct.fsm import ClassicFSM, State5from direct.fsm import State6import TTEmote7from otp.avatar import Emote8class Motion(StateData.StateData):9 notify = DirectNotifyGlobal.directNotify.newCategory('Motion')10 def __init__(self, toon):11 self.lt = toon12 self.doneEvent = 'motionDone'13 StateData.StateData.__init__(self, self.doneEvent)14 self.fsm = ClassicFSM.ClassicFSM('Motion', [State.State('off', self.enterOff, self.exitOff),15 State.State('neutral', self.enterNeutral, self.exitNeutral),16 State.State('walk', self.enterWalk, self.exitWalk),17 State.State('run', self.enterRun, self.exitRun),18 State.State('sad-neutral', self.enterSadNeutral, self.exitSadNeutral),19 State.State('sad-walk', self.enterSadWalk, self.exitSadWalk),20 State.State('catch-neutral', self.enterCatchNeutral, self.exitCatchNeutral),21 State.State('catch-run', self.enterCatchRun, self.exitCatchRun),22 State.State('catch-eatneutral', self.enterCatchEatNeutral, self.exitCatchEatNeutral),23 State.State('catch-eatnrun', self.enterCatchEatNRun, self.exitCatchEatNRun)], 'off', 'off')24 self.fsm.enterInitialState()25 def delete(self):26 del self.fsm27 def load(self):28 pass29 def unload(self):30 pass31 def enter(self):32 self.notify.debug('enter')33 def exit(self):34 self.fsm.requestFinalState()35 def enterOff(self, rate = 0):36 self.notify.debug('enterOff')37 def exitOff(self):38 pass39 def enterNeutral(self, rate = 0):40 self.notify.debug('enterNeutral')41 def exitNeutral(self):42 self.notify.debug('exitNeutral')43 def enterWalk(self, rate = 0):44 self.notify.debug('enterWalk')45 Emote.globalEmote.disableBody(self.lt, 'enterWalk')46 def exitWalk(self):47 self.notify.debug('exitWalk')48 Emote.globalEmote.releaseBody(self.lt, 'exitWalk')49 def enterRun(self, rate = 0):50 self.notify.debug('enterRun')51 Emote.globalEmote.disableBody(self.lt, 'enterRun')52 def exitRun(self):53 self.notify.debug('exitRun')54 Emote.globalEmote.releaseBody(self.lt, 'exitRun')55 def enterSadNeutral(self, rate = 0):56 self.notify.debug('enterSadNeutral')57 def exitSadNeutral(self):58 self.notify.debug('exitSadNeutral')59 def enterSadWalk(self, rate = 0):60 self.notify.debug('enterSadWalk')61 def exitSadWalk(self):62 pass63 def enterCatchNeutral(self, rate = 0):64 self.notify.debug('enterCatchNeutral')65 def exitCatchNeutral(self):66 self.notify.debug('exitCatchNeutral')67 def enterCatchRun(self, rate = 0):68 self.notify.debug('enterCatchRun')69 def exitCatchRun(self):70 self.notify.debug('exitCatchRun')71 def enterCatchEatNeutral(self, rate = 0):72 self.notify.debug('enterCatchEatNeutral')73 def exitCatchEatNeutral(self):74 self.notify.debug('exitCatchEatNeutral')75 def enterCatchEatNRun(self, rate = 0):76 self.notify.debug('enterCatchEatNRun')77 def exitCatchEatNRun(self):78 self.notify.debug('exitCatchEatNRun')79 def setState(self, anim, rate):80 toon = self.lt81 if toon.playingAnim != anim:...
runner.py
Source:runner.py
1#!/usr/bin/python -tt2#3# Copyright (c) 2011 Intel, Inc.4#5# This program is free software; you can redistribute it and/or modify it6# under the terms of the GNU General Public License as published by the Free7# Software Foundation; version 2 of the License8#9# This program is distributed in the hope that it will be useful, but10# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY11# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License12# for more details.13#14# You should have received a copy of the GNU General Public License along15# with this program; if not, write to the Free Software Foundation, Inc., 5916# Temple Place - Suite 330, Boston, MA 02111-1307, USA.17import os18import subprocess19from mic import msger20def runtool(cmdln_or_args, catch=1):21 """ wrapper for most of the subprocess calls22 input:23 cmdln_or_args: can be both args and cmdln str (shell=True)24 catch: 0, quitely run25 1, only STDOUT26 2, only STDERR27 3, both STDOUT and STDERR28 return:29 (rc, output)30 if catch==0: the output will always None31 """32 if catch not in (0, 1, 2, 3):33 # invalid catch selection, will cause exception, that's good34 return None35 if isinstance(cmdln_or_args, list):36 cmd = cmdln_or_args[0]37 shell = False38 else:39 import shlex40 cmd = shlex.split(cmdln_or_args)[0]41 shell = True42 if catch != 3:43 dev_null = os.open("/dev/null", os.O_WRONLY)44 if catch == 0:45 sout = dev_null46 serr = dev_null47 elif catch == 1:48 sout = subprocess.PIPE49 serr = dev_null50 elif catch == 2:51 sout = dev_null52 serr = subprocess.PIPE53 elif catch == 3:54 sout = subprocess.PIPE55 serr = subprocess.STDOUT56 try:57 p = subprocess.Popen(cmdln_or_args, stdout=sout,58 stderr=serr, shell=shell)59 (sout, serr) = p.communicate()60 # combine stdout and stderr, filter None out61 out = ''.join(filter(None, [sout, serr]))62 except OSError, e:63 if e.errno == 2:64 # [Errno 2] No such file or directory65 msger.error('Cannot run command: %s, lost dependency?' % cmd)66 else:67 raise # relay68 finally:69 if catch != 3:70 os.close(dev_null)71 return (p.returncode, out)72def show(cmdln_or_args):73 # show all the message using msger.verbose74 rc, out = runtool(cmdln_or_args, catch=3)75 if isinstance(cmdln_or_args, list):76 cmd = ' '.join(cmdln_or_args)77 else:78 cmd = cmdln_or_args79 msg = 'running command: "%s"' % cmd80 if out: out = out.strip()81 if out:82 msg += ', with output::'83 msg += '\n +----------------'84 for line in out.splitlines():85 msg += '\n | %s' % line86 msg += '\n +----------------'87 msger.verbose(msg)88 return rc89def outs(cmdln_or_args, catch=1):90 # get the outputs of tools91 return runtool(cmdln_or_args, catch)[1].strip()92def quiet(cmdln_or_args):...
location.py
Source:location.py
1#_*_ coding=utf-8 _*_2from selenium import webdriver3from selenium.webdriver.support.ui import Select4from selenium.webdriver.common.action_chains import ActionChains5from poseidon.base import CommonBase as cb6'''7ç®æå°è£
å®ä½å个å
ç´ åä¸ç»å
ç´ çæ¹æ³8'''9"""å®ä½å个å
ç´ """10@cb.com_try_catch11def findId(driver, id):12 f = driver.find_element_by_id(id)13 return f14@cb.com_try_catch15def findName(driver, name):16 f = driver.find_element_by_name(name)17 return f18@cb.com_try_catch19def findClassName(driver, name):20 f = driver.find_element_by_class_name(name)21 return f22@cb.com_try_catch23def findTagName(driver, name):24 f = driver.find_element_by_tag_name(name)25 return f26@cb.com_try_catch27def findLinkText(driver, text):28 f = driver.find_element_by_link_text(text)29 return f30@cb.com_try_catch31def findPLinkText(driver, text):32 f = driver.find_element_by_partial_link_text(text)33 return f34@cb.com_try_catch35def findXpath(driver, xpath):36 f = driver.find_element_by_xpath(xpath)37 return f38@cb.com_try_catch39def findCss(driver, css):40 f = driver.find_element_by_css_selector(css)41 return f42'''å®ä½ä¸ç»å
ç´ '''43@cb.com_try_catch44def findsId(driver, id):45 f = driver.find_elements_by_id(id)46 return f47@cb.com_try_catch48def findsName(driver, name):49 f = driver.find_elements_by_name(name)50 return f51@cb.com_try_catch52def findsClassName(driver, name):53 f = driver.find_elements_by_class_name(name)54 return f55@cb.com_try_catch56def findsTagName(driver, name):57 f = driver.find_elements_by_tag_name(name)58 return f59@cb.com_try_catch60def findsLinkText(driver, text):61 f = driver.find_elements_by_link_text(text)62 return f63@cb.com_try_catch64def findsPLinkText(driver, text):65 f = driver.find_elements_by_partial_link_text(text)66 return f67@cb.com_try_catch68def findsXpath(driver, xpath):69 f = driver.find_elements_by_xpath(xpath)70 return f71@cb.com_try_catch72def findsCss(driver, css):73 f = driver.find_elements_by_css_selector(css)74 return f75def findDropdown(driver, xpath, index=0, tag_name="li"):76 """77 :param driver:78 :param xpath: ä¸æå表çxpath(æå
¶ä»å®ä½)å®ä½79 :return:80 """81 element = findXpath(driver, xpath)82 if element.tag_name.lower() != "select":83 findXpath(element, tag_name + "[" + str(index) + "]").click()84 else:85 Select(findsXpath(driver, element)).select_by_index(index)86 # aa = findsXpath(temp, "li")87 # print "success is %s" % len(aa)88def clikUnClickableElement(driver, xpath):89 f = findXpath(driver, xpath)...
Using AI Code Generation
1const { Given, When, Then } = require('cucumber');2const assert = require('assert');3const { Builder, By, Key, until } = require('selenium-webdriver');4const { expect } = require('chai');5const { Given, When, Then } = require('cucumber');6const { defineSupportCode } = require('cucumber');7const { Builder, By, Key, until } = require('selenium-webdriver');8const { expect } = require('chai');9const { Given, When, Then } = require('cucumber');10const { defineSupportCode } = require('cucumber');11const { Builder, By, Key, until } = require('selenium-webdriver');12const { expect } = require('chai');13const { Given, When, Then } = require('cucumber');14const { defineSupportCode } = require('cucumber');15const { Builder, By, Key, until } = require('selenium-webdriver');16const { expect } = require('chai');17defineSupportCode(function({ Given, When, Then }) {18 Given('I am on the Google search page', function () {19 });20 When('I search for {stringInDoubleQuotes}', function (searchString) {21 return this.driver.findElement(By.name('q')).sendKeys(searchString, Key.RETURN);22 });23 Then('the page title should start with {stringInDoubleQuotes}', function (expectedTitle) {24 return this.driver.getTitle().then(function (title) {25 expect(title).to.equal(expectedTitle);26 });27 });28});29defineSupportCode(function({ Given, When, Then }) {30 Given('I am on the Google search page', function () {31 });32 When('I search for {stringInDoubleQuotes}', function (searchString) {33 return this.driver.findElement(By.name('q')).sendKeys(searchString, Key.RETURN);34 });35 Then('the page title should start with {stringInDoubleQuotes}', function (expectedTitle) {36 return this.driver.getTitle().then(function (title) {37 expect(title).to.equal(expectedTitle);38 });39 });40});41defineSupportCode(function({ Given, When, Then }) {42 Given('I am on the Google search page', function () {43 });
Using AI Code Generation
1var {defineSupportCode} = require('cucumber');2var {setDefaultTimeout} = require('cucumber');3setDefaultTimeout(60 * 1000);4defineSupportCode(function({Given, When, Then}) {5 Given(/^I go to the "([^"]*)" page$/, function (arg1, callback) {6 callback(null, 'pending');7 });8 When(/^I click on "([^"]*)"$/, function (arg1, callback) {9 callback(null, 'pending');10 });11 Then(/^I should see the "([^"]*)" page$/, function (arg1, callback) {12 callback(null, 'pending');13 });14});15public void acceptAlert() {16 Alert alert = driver.switchTo().alert();17 alert.accept();18}19public void dismissAlert() {20 Alert alert = driver.switchTo().alert();21 alert.dismiss();22}23public String getAlertText() {24 Alert alert = driver.switchTo().alert();25 return alert.getText();26}27public void sendKeysToAlert(String keys) {28 Alert alert = driver.switchTo().alert();29 alert.sendKeys(keys);30}31Before(function() {32 this.acceptAlert();33});34var {
Using AI Code Generation
1const { Given, When, Then, After, Before } = require('cucumber');2const { expect } = require('chai');3const { Builder, By, Key, until } = require('selenium-webdriver');4const chrome = require('selenium-webdriver/chrome');5const path = require('chromedriver').path;6const options = new chrome.Options();7options.addArguments('headless');8options.addArguments('disable-gpu');9var driver = new Builder()10 .forBrowser('chrome')11 .setChromeOptions(options)12 .build();13Given('I am on the Google search page', async function () {14});15When('I search for {string}', async function (string) {16 await driver.findElement(By.name('q')).sendKeys(string, Key.RETURN);17});18Then('the page title should start with {string}', async function (string) {19 await driver.wait(until.titleIs(string), 1000);20});21After(async function () {22 await driver.quit();23});24Before(async function () {25 await driver.manage().window().setRect({ width: 1366, height: 768 });26});27{28 "scripts": {29 },30 "dependencies": {31 }32}33module.exports = {34 default: `--format-options '{"snippetInterface": "synchronous"}'`35};36const { Given, When, Then, After, Before } = require('cucumber');37const { expect } = require('chai');38const { Builder, By, Key, until } = require('selenium-webdriver
Using AI Code Generation
1var {defineSupportCode} = require('cucumber');2defineSupportCode(function({Given, When, Then}) {3 Given(/^I have entered (\d+) into the calculator$/, function (arg1) {4 return 'pending';5 });6 When(/^I press add$/, function () {7 return 'pending';8 });9 Then(/^the result should be (\d+) on the screen$/, function (arg1) {10 return 'pending';11 });12});13var {defineSupportCode} = require('cucumber');14defineSupportCode(function({Given, When, Then}) {15 Given(/^I have entered (\d+) into the calculator$/, function (arg1) {16 return 'pending';17 });18 When(/^I press add$/, function () {19 return 'pending';20 });21 Then(/^the result should be (\d+) on the screen$/, function (arg1) {22 return 'pending';23 });24});25var {defineSupportCode} = require('cucumber');26defineSupportCode(function({Given, When, Then}) {27 Given(/^I have entered (\d+) into the calculator$/, function (arg1) {28 return 'pending';29 });30 When(/^I press add$/, function () {31 return 'pending';32 });33 Then(/^the result should be (\d+) on the screen$/, function (arg1
Using AI Code Generation
1import { Given, When, Then } from 'cucumber';2import { expect } from 'chai';3import { browser, element, by } from 'protractor';4Given('I am on the Google page', async () => {5});6When('I type {string} in the search field', async (string) => {7 await element(by.name('q')).sendKeys(string);8});9Then('I should see {string} in the title', async (string) => {10 await expect(browser.getTitle()).to.eventually.equal(string);11});12Then('I should see {string} in the results', async (string) => {13 await expect(element(by.css('#result-stats')).getText()).to.eventually.contain(string);14});15import { Given, When, Then } from 'cucumber';16import { expect } from 'chai';17import { browser, element, by } from 'protractor';18Given('I am on the Google page', async () => {19});20When('I type {string} in the search field', async (string) => {21 await element(by.name('q')).sendKeys(string);22});23Then('I should see {string} in the title', async (string) => {24 await expect(browser.getTitle()).to.eventually.equal(string);25});26Then('I should see {string} in the results', async (string) => {27 await expect(element(by.css('#result-stats')).getText()).to.eventually.contain(string);28});
Using AI Code Generation
1import { Given, When, Then } from 'cucumber';2import { expect } from 'chai';3Given('I am on the homepage', function() {4 browser.pause(2000);5});6When('I enter a search item', function() {7 browser.setValue('#lst-ib', 'Cucumber');8 browser.pause(2000);9});10Then('I should see the search results', function() {11 browser.click('#tsbb');12 browser.pause(2000);13 expect(browser.getTitle()).to.equal('Cucumber - Google Search');14});15exports.config = {16 capabilities: [{17 }],18 cucumberOpts: {
LambdaTest offers a detailed Cucumber testing tutorial, explaining its features, importance, best practices, and more to help you get started with running your automation testing scripts.
Here are the detailed Cucumber testing chapters to help you get started:
Get 100 minutes of automation test minutes FREE!!