Best Python code snippet using ATX
test_app.py
Source:test_app.py
...36 group_id = app.run_py_app_async(app_name, False, "")37 assert False38 except XcalarApiStatusException as e:39 assert e.status == StatusT.StatusUdfModuleLoadFailed40 assert not app.is_app_alive(group_id)41def test_local_run_error_app(app):42 app_name = 'localError'43 app_src = 'def main(inBlob): raise ValueError("failed")'44 app.set_py_app(app_name, app_src)45 outStr, errStr = app.run_py_app(app_name, False, "")46 assert '[[""]]' == outStr47 assert "ValueError: failed" in errStr48def test_is_alive_check_in_error_scenario(app):49 app_name = 'localError'50 app_src = "def main(inBlob): import time; time.sleep(2); var = 1/0; time.sleep(100000)"51 app.set_py_app(app_name, app_src)52 is_global = True53 for _ in range(4):54 print("is_global = {}".format(is_global))55 group_id = app.run_py_app_async(app_name, is_global, "")56 # Allow some time for the backend to notice that the app has failed57 for _ in range(10):58 if not app.is_app_alive(group_id):59 break60 time.sleep(1)61 assert not app.is_app_alive(group_id)62 is_global = not is_global63def test_is_alive_check_after_app_kill(app):64 app_name = 'appKill'65 app_src = 'def main(inBlob): import time; time.sleep(2000000000)'66 app.set_py_app(app_name, app_src)67 is_global = True68 #69 # We can't kill grpc xpu processes.70 #71 grpc_pids = []72 for line in os.popen("grep GrpcServer " + xce_logdir + "/xpu.log"):73 start = line.find("Pid")74 if (start != -1):75 start += 476 end = line.find(":", start)77 grpc_pids.append(int(line[start:end]))78 print("grpc_pids=", str(grpc_pids))79 for _ in range(4):80 print("is_global = {}".format(is_global))81 group_id = app.run_py_app_async(app_name, is_global, "")82 assert app.is_app_alive(group_id)83 for pid in os.popen("pgrep childnode"):84 if (not int(pid) in grpc_pids):85 with XcUtil.ignored(Exception):86 os.kill(int(pid), signal.SIGKILL)87 # Allow some time for backend to notice that the app has been killed88 for _ in range(10):89 try:90 if not app.is_app_alive(group_id):91 break92 except Exception as e:93 print("Exception:" + str(e))94 time.sleep(10000)95 time.sleep(1)96 assert not app.is_app_alive(group_id)97 is_global = not is_global98def test_is_alive_after_clean_exit(app):99 sleep_time = 5100 app_name = 'app'101 app_src = 'def main(inBlob): import time; time.sleep({})'.format(102 sleep_time)103 app.set_py_app(app_name, app_src)104 is_global = True105 for _ in range(2):106 print("is_global = {}".format(is_global))107 group_id = app.run_py_app_async(app_name, is_global, "")108 assert app.is_app_alive(group_id)109 for i in range(10):110 if not app.is_app_alive(group_id):111 break112 time.sleep(1)113 assert not app.is_app_alive(group_id)114 is_global = not is_global115def test_app_cancel(app):116 app_name = 'appCancel'117 # Validate cancel reaps out the App state and allows refreshing the118 # App source code.119 for ii in range(0, 4):120 if ii % 2 == 0:121 app_src = 'def main(inBlob): import time; time.sleep(1000000000)'122 else:123 app_src = 'def main(inBlob): import time; time.sleep(2000000000)'124 app.set_py_app(app_name, app_src)125 group_id = app.run_py_app_async(app_name, False, "")126 for _ in range(10):127 assert app.is_app_alive(group_id)128 time.sleep(1)129 app.cancel(group_id)130 assert not app.is_app_alive(group_id)131# Witness to ENG-6824. We want to construct a message that just barely132# tips over to 2 pages133def test_xpu_comm(app):134 app_name = "test_xpu_comm"135 app_src = """136import pickle137from xcalar.container.cluster import get_running_cluster138import xcalar.container.parent as xce139from xcalar.compute.services.Stats_xcrpc import Stats140import xcalar.compute.localtypes.Stats_pb2141def main(inBlob):142 xceClient = xce.Client()143 xdbPageSize = xceClient.get_xdb_page_size()144 cluster = get_running_cluster()...
base.py
Source:base.py
...54 if self.exists(text=retry_text):55 self.logger.info('被å°ç¦ï¼ç¨ååéè¯')56 # time.sleep(60 * 2)57 sys.exit()58 if self.is_app_alive():59 # print(self.get_source())60 self.close_popup()61 if self.is_need_break():62 break63 action(*args, **kwargs)64 if activity and self.get_current_activity() != activity:65 self.logger.info(66 f'{self.device_serial}: {action_name} è·³åºäºé¡µé¢; current_activity: {self.get_current_activity()}'67 )68 break69 # time.sleep(random.randint(1, 5)/10)70 self.logger.info(f'{action_name} action done!')71 else:72 self.close_app()73 self.click_if_exist(text="确认")74 time.sleep(2)75 self.app_start()76 # TODO ç¶æè®°å¿åæ¢å¤77 self.logger.info(f'{self.device_serial}: {action_name} ç¶æå¼å¸¸å¯¼è´åºç¨éå¯')78 def swipe(self, *args):79 self.device.swipe(*args)80 def swipe_right(self, t=0.1, delay=1):81 self.device.swipe(0.1, 0.5, 0.9, 0.5, t)82 time.sleep(delay)83 def swipe_left(self, t=0.1, delay=1):84 self.device.swipe(0.9, 0.5, 0.1, 0.5, t)85 time.sleep(delay)86 def swipe_up(self, t=0.02, delay=0.5):87 self.device.swipe(self.width * 0.5, self.height * 0.8, self.width * 0.5, self.height * 0.3, t)88 time.sleep(delay)89 def swipe_down(self, t=0.02, delay=0.5):90 self.device.swipe(self.width * 0.5, self.height * 0.1, self.width * 0.5, self.height * 0.8, t)91 time.sleep(delay)92 def scroll(self):93 self.device(scrollable=True).scroll(steps=10)94 def fling(self):95 self.device(scrollable=True).fling()96 # self.device.wait_activity()97 def exists(self, **kwargs):98 return self.device(**kwargs).exists99 def click_if_exist(self, **kwargs):100 d = self.device(**kwargs)101 if d.exists:102 try:103 d.click(timeout=2)104 except Exception as e:105 self.logger.info(e)106 def close_popup(self):107 for text in self.popup_list:108 try:109 self.click_if_exist(text=text)110 except Exception:111 self.logger.info(f'close_popup: {text}')112 def is_need_break(self):113 for text in self.break_list:114 try:115 if self.exists(text=text):116 print(text)117 return True118 except Exception:119 self.logger.info(f'need_break: {text}')120 return False121 def is_app_alive(self):122 if self.get_current_package() != self.pkg_name:123 self.logger.info('åºç¨å·²ç»éåº')124 return False125 else:126 return True127 def close_app(self):128 if self.session:129 self.session.close()130 else:...
kafka_app_runner.py
Source:kafka_app_runner.py
...9 key = get_key(topic)10 if len(kvstore.list(key)) == 0:11 return None12 return kvstore.lookup(key)13def is_app_alive(client, topic):14 group_id = get_group_id(client.global_kvstore(), topic)15 if group_id is None:16 return False17 app = App(client)18 return app.is_app_alive(group_id)19def get_pidfile_path(topic):20 return f'{tempfile.gettempdir()}/kafka_{topic}_listener_app.pid'21def runKafkaApp(appName, args, client, async_mode=False):22 print(f'Running {appName} with {args}, async_mode: {async_mode}')23 print(f"Topic: {args['configuration']['kafka_config']['topic']}")24 args['async_mode'] = async_mode25 topic = args['configuration']['kafka_config']['topic']26 app = App(client)27 app_code = os.path.dirname(Path(__file__)) + '/kafka_app.py'28 code = ''29 with open(app_code) as fp:30 code = '\n'.join(fp.readlines())31 app.set_py_app(appName, code)32 if async_mode:33 key = args["alias"] if "alias" in args else topic34 if is_app_alive(client, key):35 raise Exception('App is already running.')36 group_id = app.run_py_app_async(appName, False, json.dumps(args))37 kvstore = client.global_kvstore()38 kvstore.add_or_replace(get_key(key), group_id, True)39 return group_id40 else:41 (outStr, errStr) = app.run_py_app(appName, False, json.dumps(args))42 return (outStr, errStr)43def runKafkaAppFromShell(appName,44 configuration,45 session_name,46 xcalar_username,47 xcalar_client,48 num_runs,49 profile=False,50 async_mode=False,51 purge_interval_in_seconds=None, #30052 purge_cycle_in_seconds=None, #3053 alias=None):54 # configuration = {'session_name': session_name}55 # with open(kafka_properties_path) as cp:56 # configuration['kafka_config'] = json.load(cp)57 args = {58 'configuration': configuration,59 'profile': profile,60 'num_runs': num_runs,61 'username': xcalar_username,62 'async_mode': async_mode,63 'purge_interval_in_seconds': purge_interval_in_seconds if purge_interval_in_seconds else configuration['kafka_config'].get('purge_interval_in_seconds',300),64 'purge_cycle_in_seconds': purge_cycle_in_seconds if purge_cycle_in_seconds else configuration['kafka_config'].get('purge_cycle_in_seconds',300)65 }66 if alias is not None:67 args['alias'] = alias68 return runKafkaApp(appName, args, xcalar_client, async_mode=async_mode)69def stopKafkaApp(client, topic):70 if is_app_alive(client, topic):71 app = App(client)72 app.cancel(get_group_id(client.global_kvstore(), topic))73 code = '''74import os75import json76def main(inBlob):77 args = json.loads(inBlob)78 os.remove(args["pidfile_path"])79'''80 app.set_py_app('kafka_app_cleanup', code)81 (outStr, errStr) = app.run_py_app(82 'kafka_app_cleanup', False,83 json.dumps({84 'pidfile_path': get_pidfile_path(topic)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!