How to use is_done method in lisa

Best Python code snippet using lisa_python

test_parallel_build.py

Source:test_parallel_build.py Github

copy

Full Screen

...86 self.parallel_tracker.load_fake_deps(deps, deps1)87 # full queue88 def test_full_build(self):89 bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.serial_tracker)90 self.assertFalse(bq.is_done())91 self.assertFalse(bq.succeeded())92 self.assertEqual('f', bq.get_valid_package())93 self.assertEqual(0, len(bq.built))94 bq.return_built('f')95 self.assertEqual(1, len(bq.built))96 self.assertFalse(bq.is_done())97 self.assertFalse(bq.succeeded())98 self.assertEqual('e', bq.get_valid_package())99 bq.return_built('e')100 self.assertEqual(2, len(bq.built))101 self.assertFalse(bq.is_done())102 self.assertFalse(bq.succeeded())103 self.assertEqual('d', bq.get_valid_package())104 bq.return_built('d')105 self.assertEqual(3, len(bq.built))106 self.assertFalse(bq.is_done())107 self.assertFalse(bq.succeeded())108 self.assertEqual('c', bq.get_valid_package())109 bq.return_built('c')110 self.assertEqual(4, len(bq.built))111 self.assertFalse(bq.is_done())112 self.assertFalse(bq.succeeded())113 self.assertEqual('b', bq.get_valid_package())114 bq.return_built('b')115 self.assertEqual(5, len(bq.built))116 self.assertFalse(bq.is_done())117 self.assertFalse(bq.succeeded())118 self.assertEqual('a', bq.get_valid_package())119 self.assertFalse(bq.is_done())120 self.assertFalse(bq.succeeded())121 bq.return_built('a')122 self.assertEqual(6, len(bq.built))123 self.assertTrue(bq.is_done())124 self.assertTrue(bq.succeeded())125 # partial build126 def test_partial_build(self):127 bq = parallel_build.BuildQueue(['d', 'e', 'f'], self.serial_tracker)128 self.assertFalse(bq.is_done())129 self.assertFalse(bq.succeeded())130 self.assertEqual('f', bq.get_valid_package())131 self.assertEqual(0, len(bq.built))132 bq.return_built('f')133 self.assertEqual(1, len(bq.built))134 self.assertFalse(bq.is_done())135 self.assertFalse(bq.succeeded())136 self.assertEqual('e', bq.get_valid_package())137 bq.return_built('e')138 self.assertEqual(2, len(bq.built))139 self.assertFalse(bq.is_done())140 self.assertFalse(bq.succeeded())141 self.assertEqual('d', bq.get_valid_package())142 self.assertFalse(bq.is_done())143 self.assertFalse(bq.succeeded())144 bq.return_built('d')145 self.assertEqual(3, len(bq.built))146 self.assertTrue(bq.is_done())147 self.assertTrue(bq.succeeded())148 # abort early149 def test_abort_early(self):150 bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.serial_tracker)151 self.assertFalse(bq.is_done())152 self.assertFalse(bq.succeeded())153 self.assertEqual(0, len(bq.built))154 self.assertEqual('f', bq.get_valid_package())155 bq.return_built('f')156 self.assertEqual(1, len(bq.built))157 self.assertFalse(bq.is_done())158 self.assertFalse(bq.succeeded())159 self.assertEqual('e', bq.get_valid_package())160 bq.return_built('e')161 self.assertEqual(2, len(bq.built))162 self.assertFalse(bq.is_done())163 self.assertFalse(bq.succeeded())164 self.assertEqual('d', bq.get_valid_package())165 bq.return_built('d')166 self.assertEqual(3, len(bq.built))167 self.assertFalse(bq.is_done())168 self.assertFalse(bq.succeeded())169 bq.stop()170 self.assertTrue(bq.is_done())171 self.assertFalse(bq.succeeded())172 self.assertEqual(None, bq.get_valid_package())173 # many parallel174 def test_parallel_build(self):175 bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.parallel_tracker)176 self.assertFalse(bq.is_done())177 self.assertFalse(bq.succeeded())178 dependents = ['b', 'c', 'd', 'e', 'f']179 count = 0180 total = 6181 while len(dependents) > 0:182 result = bq.get_valid_package()183 done = len(bq.built)184 pkgs = bq._total_pkgs185 self.assertTrue(result in dependents)186 # print result, done, pkgs187 dependents.remove(result)188 self.assertEqual(count, done)189 self.assertEqual(total, pkgs)190 self.assertFalse(bq.is_done())191 self.assertFalse(bq.succeeded())192 bq.return_built(result)193 count = count + 1194 self.assertFalse(bq.is_done())195 self.assertFalse(bq.succeeded())196 self.assertEqual('a', bq.get_valid_package())197 self.assertFalse(bq.is_done())198 self.assertFalse(bq.succeeded())199 bq.return_built('a')200 self.assertTrue(bq.is_done())201 self.assertTrue(bq.succeeded())...

Full Screen

Full Screen

binary_tree.py

Source:binary_tree.py Github

copy

Full Screen

1class Stack(object):2 def __init__(self):3 self.items = []4 def __len__(self):5 return self.size()6 7 def size(self):8 return len(self.items)9 def push(self, item):10 self.items.append(item)11 def pop(self): 12 if not self.is_empty():13 return self.items.pop()14 def peek(self):15 if not self.is_empty():16 return self.items[-1]17 def is_empty(self):18 return len(self.items) == 019 def __str__(self):20 s = ""21 for i in range(len(self.items)):22 s += str(self.items[i].value) + "-"23 return s24class Queue(object):25 def __init__(self):26 self.items = []27 def __len__(self):28 return self.size()29 def enqueue(self, item):30 self.items.insert(0, item)31 def dequeue(self):32 if not self.is_empty():33 return self.items.pop()34 def size(self):35 return len(self.items)36 def is_empty(self):37 return len(self.items) == 038 def peek(self):39 if not self.is_empty():40 return self.items[-1].value41class Node(object):42 def __init__(self, value):43 self.value = value44 self.left = None45 self.right = None46class BinaryTree(object):47 def __init__(self, root):48 self.root = Node(root)49 def search(self, find_val, traversal_type):50 if traversal_type == "preorder":51 return self.preorder_search(tree.root, find_val)52 elif traversal_type == "inorder":53 return self.inorder_search(tree.root, find_val)54 elif traversal_type == "postorder":55 return self.postorder_search(tree.root, find_val)56 else:57 print("Traversal type " + str(traversal_type) + " not recognized.")58 return False59 def print_tree(self, traversal_type):60 # Recursive traversals61 if traversal_type == "preorder":62 return self.preorder_print(tree.root, "")63 elif traversal_type == "inorder":64 return self.inorder_print(tree.root, "")65 elif traversal_type == "postorder":66 return self.postorder_print(tree.root, "")67 # Iterative traversals68 elif traversal_type == "levelorder":69 return self.levelorder_print(tree.root)70 elif traversal_type == "inorder_iterative":71 return self.inorder_iterative(tree.root)72 elif traversal_type == "preorder_iterative":73 return self.preorder_iterative(tree.root)74 elif traversal_type == "postorder_iterative":75 return self.postorder_iterative(tree.root)76 else:77 print("Traversal type " + str(traversal_type) + " not recognized.")78 return False79 def levelorder_print(self, start):80 if start is None:81 return82 queue = Queue()83 queue.enqueue(start)84 traversal = ""85 while len(queue) > 0:86 traversal += str(queue.peek()) + "-"87 node = queue.dequeue()88 if node.left:89 queue.enqueue(node.left)90 if node.right:91 queue.enqueue(node.right)92 return traversal93 def preorder_search(self, start, find_val):94 if start:95 if start.value == find_val:96 return True97 else:98 return self.preorder_search(start.left, find_val) or \99 self.preorder_search(start.right, find_val)100 return False101 def preorder_print(self, start, traversal):102 """Root->Left-Right"""103 if start:104 traversal += (str(start.value) + "-")105 traversal = self.preorder_print(start.left, traversal)106 traversal = self.preorder_print(start.right, traversal)107 return traversal108 def inorder_print(self, start, traversal):109 """Left->Root->Right"""110 if start:111 traversal = self.inorder_print(start.left, traversal)112 traversal += (str(start.value) + "-")113 traversal = self.inorder_print(start.right, traversal)114 return traversal115 def postorder_print(self, start, traversal):116 """Left->Right->Root"""117 if start:118 traversal = self.postorder_print(start.left, traversal)119 traversal = self.postorder_print(start.right, traversal)120 traversal += (str(start.value) + "-")121 return traversal122 def preorder_iterative(self, start):123 stack = Stack()124 cur = start125 is_done = False126 traversal = ""127 while not is_done:128 if cur is not None:129 traversal += str(cur.value) + "-"130 stack.push(cur)131 cur = cur.left132 else:133 if len(stack) > 0:134 cur = stack.pop()135 cur = cur.right136 else:137 is_done = True138 return traversal139 def inorder_iterative(self, start):140 s = Stack()141 cur = start142 is_done = False143 traversal = ""144 while not is_done:145 if cur is not None:146 s.push(cur)147 cur = cur.left148 else:149 if len(s) > 0:150 cur = s.pop()151 traversal += str(cur.value) + "-"152 cur = cur.right153 else:154 is_done = True155 return traversal156 def postorder_iterative(self, start):157 s = Stack()158 cur = start159 is_done = False160 traversal = ""161 while not is_done:162 163 if cur is not None:164 s.push(cur)165 cur = cur.left166 else:167 if len(s) > 0:168 cur = s.pop()169 traversal += str(cur.value) + "-"170 cur = cur.right171 else:172 is_done = True173 return traversal174 def height(self, node):175 if node is None:176 return 0177 left = self.height(node.left)178 right = self.height(node.right)179 if left > right:180 h = 1 + left181 else:182 h = 1 + right183 return h184 def lowest_common_ancestor(self, node1, node2):185 stack_path_1 = Stack()186 stack_path_2 = Stack()187 cur = self.root188 is_done = False189 while not is_done:190 if cur is not None:191 if cur.value == node1.value:192 break193 stack_path_1.push(cur)194 cur = cur.left195 else:196 if len(stack_path_1) > 0:197 cur = stack_path_1.pop()198 cur = cur.right199 else:200 is_done = True201 202 cur = self.root203 is_done = False204 while not is_done:205 if cur is not None:206 if cur.value == node2.value:207 break208 stack_path_2.push(cur)209 cur = cur.left210 else:211 if len(stack_path_2) > 0:212 cur = stack_path_2.pop()213 cur = cur.right214 else:215 is_done = True216 print(stack_path_1)217 print(stack_path_2)218 #return traversal219# Set up tree:220tree = BinaryTree(1)221tree.root.left = Node(2)222tree.root.right = Node(3)223tree.root.left.left = Node(4)224tree.root.left.right = Node(5)225tree.root.right.left = Node(6)226tree.root.right.right = Node(7)227tree.root.right.right.right = Node(8)228#tree.root.left.right.left = Node(7)229#tree.root.left.right.right = Node(8)230#tree.root.right.right = Node(6)231#tree.root.right.right.left = Node(9)232#print(tree.height(tree.root))233# Should be True234#print(tree.search(4))235# Should be False:236#print(tree.search(6))237# Test print_tree238print("Preorder Recursive:")239print(tree.print_tree("preorder"))240print("Preorder Iterative:")241print(tree.print_tree("preorder_iterative"))242print("Postorder Recursive")243print(tree.print_tree("postorder"))244#print("Postorder Iterative:")245#print(tree.print_tree("postorder_iterative"))246print("Inorder Recursive:")247print(tree.print_tree("inorder"))248print("Inorder Iterative:")249print(tree.print_tree("inorder_iterative"))250print("Levelorder Iterative:")251print(tree.print_tree("levelorder"))252print("\n")253x = tree.root.left.left254y = tree.root.left.right...

Full Screen

Full Screen

fabfile.py

Source:fabfile.py Github

copy

Full Screen

1from fabric.api import *2from os import *3env.use_ssh_config = True4env.output_prefix = True5scriptFileName = None6repoList = {}7def get_immediate_subdirectories(a_dir='~/repos/'):8 result = run("find {} -maxdepth 1 -type d".format(a_dir))9 result = result.splitlines()10 del result[0]11 return result12@serial13def reboot():14 with hide('running', 'stdout', 'stderr'):15 is_done = True16 try:17 sudo("shutdown -r 1", shell=False)18 except Exception as e:19 print("{} - {}".format(env.host_string, e))20 is_done = False21 finally:22 return is_done23def apt_update():24 with hide('running', 'stdout', 'stderr'):25 is_done = True26 try:27 pathList = run('ls -a ~')28 if ".noApt" in pathList.stdout:29 print("{} has .noApt - skipping".format(env.host_string))30 return is_done31 print("Updating {} with apt...".format(env.host_string))32 sudo("apt update", shell=False)33 sudo("DEBIAN_FRONTEND=noninteractive apt upgrade -yq", shell=False,34 timeout=120)35 sudo("apt autoremove -yq", shell=False, timeout=120)36 except Exception as e:37 print("{} - {}".format(env.host_string, e))38 is_done = False39 finally:40 return is_done41def yum_update():42 with hide('running', 'stdout', 'stderr'):43 is_done = True44 try:45 print("Updating {} with yum...".format(env.host_string))46 sudo("yum -y update", shell=False)47 except Exception as e:48 print("{} - {}".format(env.host_string, e))49 is_done = False50 finally:51 return is_done52def opkg_update():53 with show('running', 'stdout', 'stderr'):54 is_done = True55 try:56 print("Updating {} with opkg...".format(env.host_string))57 run("opkg update", shell=False)58 packages = run("opkg list-upgradable | awk '{print $1}' | "59 "sed ':M;N;$!bM;s#\\n# #g'", shell=False)60 if packages:61 run("opkg upgrade %s" % packages, shell=False)62 else:63 print("No updated packages on {}".format(env.host_string))64 except Exception as e:65 print("{} - {}".format(env.host_string, e))66 is_done = False67 finally:68 return is_done69def repo_update(repoName="Home Bin", repoDirectory="~/bin/"):70 with hide('running', 'stdout', 'stderr'):71 is_done = True72 try:73 with cd(repoDirectory):74 pathList = run('ls -a')75 if ".noPull" in pathList.stdout:76 print("{} repo {} has .noPull - skipping".format(77 env.host_string, repoName))78 return is_done79 r = run('git pull')80 if 'Already up-to-date' in r.stdout:81 print("{} repo {} already up-to-date".format(82 env.host_string, repoName))83 else:84 print("{} repo {} updated".format(env.host_string,85 repoName))86 if '.postpull.sh' in pathList.stdout:87 print("{} repo {} - updating permissions".format(88 env.host_string, repoName))89 run('./.postpull.sh')90 except Fatal as e:91 print("{} - {}".format(env.host_string, e))92 is_done = False93 finally:94 return is_done95def setup():96 with hide('running', 'stdout', 'stderr'):97 is_done = True98 apt_sudoers = env.user + " * = (root) NOPASSWD:SETENV: /usr/bin/apt,"\99 "/usr/local/bin/apt"100 power_sudoers = env.user + " * = (root) NOPASSWD: /sbin/shutdown"101 try:102 sudo("echo \"{}\" >> /etc/sudoers.d/fabric".format(apt_sudoers))103 sudo("echo \"{}\" >> /etc/sudoers.d/fabric".format(power_sudoers))104 pathList = run('ls -a ~')105 if "repos" not in pathList.stdout:106 print("No repos dir on {} - creating".format(env.host_string))107 run("mkdir ~/repos/")108 if "bin" not in pathList.stdout:109 print("No bin dir on {} - cloning".format(env.host_string))110 run("git clone https://github.com/mbeland/bin.git")111 except Exception as e:112 print("{} - {}".format(env.host_string, e))113 is_done = False114 finally:115 return is_done116@parallel117def updates():118 with hide('running', 'stdout', 'stderr'):119 is_done = True120 try:121 apt_update()122 updateRepos()123 except Exception as e:124 print("{} - {}".format(env.host_string, e))125 is_done = False126 finally:127 return is_done128def updateRepos():129 with hide('running', 'stdout', 'stderr'):130 is_done = True131 reposDir = '~/repos/'132 try:133 repo_update("Home Bin", "~/bin/")134 repo_update("SSH", "~/.ssh")135 repo_update("Home Dir", "~/")136 dirsToUpdate = get_immediate_subdirectories(reposDir)137 for x in dirsToUpdate:138 y = x.split("/")139 repo_update(y[-1], x)140 except Exception as e:141 print("{} - {}".format(env.host_string, e))142 is_done = False143 finally:144 return is_done145def deploy_script():146 with hide('running', 'stdout', 'stderr'):147 is_done = True148 global scriptFileName149 try:150 if scriptFileName:151 scriptFileName = input('Script to execute: ')152 put(scriptFileName, "/tmp/", mirror_local_mode=True)153 remotePath = "/tmp/" + scriptFileName154 run(remotePath)155 run("rm " + remotePath)156 except Exception as e:157 print("{} - {}".format(env.host_string, e))158 is_done = False159 finally:...

Full Screen

Full Screen

train_hand.py

Source:train_hand.py Github

copy

Full Screen

1#coding=utf-82import rospy3from std_msgs.msg import Int164import time5from geometry_msgs.msg import Twist #导入需要的ros的package 6import numpy as np7is_done=Int16()8is_done.data=09#定义发送数据的函数:只发一次(可以考虑发很多次,最终要看效果如何)10def data_publish(u):11 #初始化node12 13 #初始化publisher14 pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10) # topic和消息类型需要和肖岸星商量确定一下 15 16 #初始化twist17 twist = Twist()18 # 给twist赋值19 twist.linear.x = u[0] #线速度20 twist.linear.y = 0.021 twist.linear.z = 0.022 twist.angular.x = 0.0 #角速度23 twist.angular.y = 0.024 twist.angular.z = u[1]25 #调试代码26 # print("twist")27 # print(twist)28 #发送twist29 pub.publish(twist) #将数据发送出去30#发送twist:一直发指令,直到一次运动结束之后就不发指令31def callback_2(data): 32 #接受数据33 is_done.data= data.data34 #print(finish)35 #判断状态36 # if (finish.data == 1):#完成转动37 # is_done = 138 # # print("callback_2",is_done)39 # else:#没完成转动40 # is_done = 0 41 # print("moving")42if __name__ == '__main__':43 #初始化44 rospy.init_node('train_hand', anonymous=True)45 rospy.Subscriber("is_done", Int16, callback_2)46 start_flag = 1 #开始指令47 twist = Twist() #变量声明48 u = np.zeros(2) #控制量声明49 50 time.sleep(1)51 print(is_done.data)52 #控制说明53 print("前进:w 左拐:a 右拐:d 后退:s")54 time.sleep(0.1)55 #开始控制56 while start_flag == 1: #一直循环接受数据57 control_key = raw_input("raw_input:") #获得键盘输入58 if control_key == "w": #前进59 #显示控制量60 print("control is w")61 #设置频率62 63 #没收到结束信息一直发送控制量64 if(is_done.data==1):65 #赋值线速度66 u[0] = 167 u[1] = 068 69 #发布线速度消息70 data_publish(u)71 72 #以10HZ频率发送数据73 74 75 elif control_key == "a": #左转76 #显示控制量77 print("control is a")78 #设置频率79 rate = rospy.Rate(10) # 10hz80 #没收到结束信息一直发送控制量81 if(is_done.data==1):82 #赋值线速度83 u[0] = 084 u[1] = -185 86 #发布线速度消息87 data_publish(u)88 89 #如果接收到结束信息,is_done设为1,结束循环90 91 #以10HZ频率发送数据92 rate.sleep()93 elif control_key == "d": #右转94 #显示控制量95 print("control is d")96 #设置频率97 rate = rospy.Rate(10) # 10hz98 if(is_done.data==1):99 #赋值线速度100 u[0] = 0101 u[1] = 1102 103 #发布线速度消息104 data_publish(u)105 106 #如果接收到结束信息,is_done设为1,结束循环107 108 #以10HZ频率发送数据109 rate.sleep()110 elif control_key == "s": #后退111 #显示控制量112 print("control is s")113 #设置频率114 rate = rospy.Rate(10) # 10hz115 if(is_done.data==1):116 #赋值线速度117 u[0] = -1118 u[1] = 0119 120 #发布线速度消息121 data_publish(u)122 123 #如果接收到结束信息,is_done设为1,结束循环124 125 #以10HZ频率发送数据126 rate.sleep()127 128<<<<<<< HEAD129 elif control_key == "q": #六足步态130 #显示控制量131 print("control is s")132 #设置频率133 rate = rospy.Rate(10) # 10hz134 if(is_done.data==1):135 #赋值线速度136 u[0] = 2137 u[1] = 0138 139 #发布线速度消息140 data_publish(u)141 142 #如果接收到结束信息,is_done设为1,结束循环143 144 #以10HZ频率发送数据145 rate.sleep()146=======147>>>>>>> 6ca41d4ff84544147f430048270163452df0609e148 #将is_down复位149 print("is_done已重置")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful