Best Python code snippet using lisa_python
test_parallel_build.py
Source:test_parallel_build.py
...86 self.parallel_tracker.load_fake_deps(deps, deps1)87 # full queue88 def test_full_build(self):89 bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.serial_tracker)90 self.assertFalse(bq.is_done())91 self.assertFalse(bq.succeeded())92 self.assertEqual('f', bq.get_valid_package())93 self.assertEqual(0, len(bq.built))94 bq.return_built('f')95 self.assertEqual(1, len(bq.built))96 self.assertFalse(bq.is_done())97 self.assertFalse(bq.succeeded())98 self.assertEqual('e', bq.get_valid_package())99 bq.return_built('e')100 self.assertEqual(2, len(bq.built))101 self.assertFalse(bq.is_done())102 self.assertFalse(bq.succeeded())103 self.assertEqual('d', bq.get_valid_package())104 bq.return_built('d')105 self.assertEqual(3, len(bq.built))106 self.assertFalse(bq.is_done())107 self.assertFalse(bq.succeeded())108 self.assertEqual('c', bq.get_valid_package())109 bq.return_built('c')110 self.assertEqual(4, len(bq.built))111 self.assertFalse(bq.is_done())112 self.assertFalse(bq.succeeded())113 self.assertEqual('b', bq.get_valid_package())114 bq.return_built('b')115 self.assertEqual(5, len(bq.built))116 self.assertFalse(bq.is_done())117 self.assertFalse(bq.succeeded())118 self.assertEqual('a', bq.get_valid_package())119 self.assertFalse(bq.is_done())120 self.assertFalse(bq.succeeded())121 bq.return_built('a')122 self.assertEqual(6, len(bq.built))123 self.assertTrue(bq.is_done())124 self.assertTrue(bq.succeeded())125 # partial build126 def test_partial_build(self):127 bq = parallel_build.BuildQueue(['d', 'e', 'f'], self.serial_tracker)128 self.assertFalse(bq.is_done())129 self.assertFalse(bq.succeeded())130 self.assertEqual('f', bq.get_valid_package())131 self.assertEqual(0, len(bq.built))132 bq.return_built('f')133 self.assertEqual(1, len(bq.built))134 self.assertFalse(bq.is_done())135 self.assertFalse(bq.succeeded())136 self.assertEqual('e', bq.get_valid_package())137 bq.return_built('e')138 self.assertEqual(2, len(bq.built))139 self.assertFalse(bq.is_done())140 self.assertFalse(bq.succeeded())141 self.assertEqual('d', bq.get_valid_package())142 self.assertFalse(bq.is_done())143 self.assertFalse(bq.succeeded())144 bq.return_built('d')145 self.assertEqual(3, len(bq.built))146 self.assertTrue(bq.is_done())147 self.assertTrue(bq.succeeded())148 # abort early149 def test_abort_early(self):150 bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.serial_tracker)151 self.assertFalse(bq.is_done())152 self.assertFalse(bq.succeeded())153 self.assertEqual(0, len(bq.built))154 self.assertEqual('f', bq.get_valid_package())155 bq.return_built('f')156 self.assertEqual(1, len(bq.built))157 self.assertFalse(bq.is_done())158 self.assertFalse(bq.succeeded())159 self.assertEqual('e', bq.get_valid_package())160 bq.return_built('e')161 self.assertEqual(2, len(bq.built))162 self.assertFalse(bq.is_done())163 self.assertFalse(bq.succeeded())164 self.assertEqual('d', bq.get_valid_package())165 bq.return_built('d')166 self.assertEqual(3, len(bq.built))167 self.assertFalse(bq.is_done())168 self.assertFalse(bq.succeeded())169 bq.stop()170 self.assertTrue(bq.is_done())171 self.assertFalse(bq.succeeded())172 self.assertEqual(None, bq.get_valid_package())173 # many parallel174 def test_parallel_build(self):175 bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.parallel_tracker)176 self.assertFalse(bq.is_done())177 self.assertFalse(bq.succeeded())178 dependents = ['b', 'c', 'd', 'e', 'f']179 count = 0180 total = 6181 while len(dependents) > 0:182 result = bq.get_valid_package()183 done = len(bq.built)184 pkgs = bq._total_pkgs185 self.assertTrue(result in dependents)186 # print result, done, pkgs187 dependents.remove(result)188 self.assertEqual(count, done)189 self.assertEqual(total, pkgs)190 self.assertFalse(bq.is_done())191 self.assertFalse(bq.succeeded())192 bq.return_built(result)193 count = count + 1194 self.assertFalse(bq.is_done())195 self.assertFalse(bq.succeeded())196 self.assertEqual('a', bq.get_valid_package())197 self.assertFalse(bq.is_done())198 self.assertFalse(bq.succeeded())199 bq.return_built('a')200 self.assertTrue(bq.is_done())201 self.assertTrue(bq.succeeded())...
binary_tree.py
Source:binary_tree.py
1class Stack(object):2 def __init__(self):3 self.items = []4 def __len__(self):5 return self.size()6 7 def size(self):8 return len(self.items)9 def push(self, item):10 self.items.append(item)11 def pop(self): 12 if not self.is_empty():13 return self.items.pop()14 def peek(self):15 if not self.is_empty():16 return self.items[-1]17 def is_empty(self):18 return len(self.items) == 019 def __str__(self):20 s = ""21 for i in range(len(self.items)):22 s += str(self.items[i].value) + "-"23 return s24class Queue(object):25 def __init__(self):26 self.items = []27 def __len__(self):28 return self.size()29 def enqueue(self, item):30 self.items.insert(0, item)31 def dequeue(self):32 if not self.is_empty():33 return self.items.pop()34 def size(self):35 return len(self.items)36 def is_empty(self):37 return len(self.items) == 038 def peek(self):39 if not self.is_empty():40 return self.items[-1].value41class Node(object):42 def __init__(self, value):43 self.value = value44 self.left = None45 self.right = None46class BinaryTree(object):47 def __init__(self, root):48 self.root = Node(root)49 def search(self, find_val, traversal_type):50 if traversal_type == "preorder":51 return self.preorder_search(tree.root, find_val)52 elif traversal_type == "inorder":53 return self.inorder_search(tree.root, find_val)54 elif traversal_type == "postorder":55 return self.postorder_search(tree.root, find_val)56 else:57 print("Traversal type " + str(traversal_type) + " not recognized.")58 return False59 def print_tree(self, traversal_type):60 # Recursive traversals61 if traversal_type == "preorder":62 return self.preorder_print(tree.root, "")63 elif traversal_type == "inorder":64 return self.inorder_print(tree.root, "")65 elif traversal_type == "postorder":66 return self.postorder_print(tree.root, "")67 # Iterative traversals68 elif traversal_type == "levelorder":69 return self.levelorder_print(tree.root)70 elif traversal_type == "inorder_iterative":71 return self.inorder_iterative(tree.root)72 elif traversal_type == "preorder_iterative":73 return self.preorder_iterative(tree.root)74 elif traversal_type == "postorder_iterative":75 return self.postorder_iterative(tree.root)76 else:77 print("Traversal type " + str(traversal_type) + " not recognized.")78 return False79 def levelorder_print(self, start):80 if start is None:81 return82 queue = Queue()83 queue.enqueue(start)84 traversal = ""85 while len(queue) > 0:86 traversal += str(queue.peek()) + "-"87 node = queue.dequeue()88 if node.left:89 queue.enqueue(node.left)90 if node.right:91 queue.enqueue(node.right)92 return traversal93 def preorder_search(self, start, find_val):94 if start:95 if start.value == find_val:96 return True97 else:98 return self.preorder_search(start.left, find_val) or \99 self.preorder_search(start.right, find_val)100 return False101 def preorder_print(self, start, traversal):102 """Root->Left-Right"""103 if start:104 traversal += (str(start.value) + "-")105 traversal = self.preorder_print(start.left, traversal)106 traversal = self.preorder_print(start.right, traversal)107 return traversal108 def inorder_print(self, start, traversal):109 """Left->Root->Right"""110 if start:111 traversal = self.inorder_print(start.left, traversal)112 traversal += (str(start.value) + "-")113 traversal = self.inorder_print(start.right, traversal)114 return traversal115 def postorder_print(self, start, traversal):116 """Left->Right->Root"""117 if start:118 traversal = self.postorder_print(start.left, traversal)119 traversal = self.postorder_print(start.right, traversal)120 traversal += (str(start.value) + "-")121 return traversal122 def preorder_iterative(self, start):123 stack = Stack()124 cur = start125 is_done = False126 traversal = ""127 while not is_done:128 if cur is not None:129 traversal += str(cur.value) + "-"130 stack.push(cur)131 cur = cur.left132 else:133 if len(stack) > 0:134 cur = stack.pop()135 cur = cur.right136 else:137 is_done = True138 return traversal139 def inorder_iterative(self, start):140 s = Stack()141 cur = start142 is_done = False143 traversal = ""144 while not is_done:145 if cur is not None:146 s.push(cur)147 cur = cur.left148 else:149 if len(s) > 0:150 cur = s.pop()151 traversal += str(cur.value) + "-"152 cur = cur.right153 else:154 is_done = True155 return traversal156 def postorder_iterative(self, start):157 s = Stack()158 cur = start159 is_done = False160 traversal = ""161 while not is_done:162 163 if cur is not None:164 s.push(cur)165 cur = cur.left166 else:167 if len(s) > 0:168 cur = s.pop()169 traversal += str(cur.value) + "-"170 cur = cur.right171 else:172 is_done = True173 return traversal174 def height(self, node):175 if node is None:176 return 0177 left = self.height(node.left)178 right = self.height(node.right)179 if left > right:180 h = 1 + left181 else:182 h = 1 + right183 return h184 def lowest_common_ancestor(self, node1, node2):185 stack_path_1 = Stack()186 stack_path_2 = Stack()187 cur = self.root188 is_done = False189 while not is_done:190 if cur is not None:191 if cur.value == node1.value:192 break193 stack_path_1.push(cur)194 cur = cur.left195 else:196 if len(stack_path_1) > 0:197 cur = stack_path_1.pop()198 cur = cur.right199 else:200 is_done = True201 202 cur = self.root203 is_done = False204 while not is_done:205 if cur is not None:206 if cur.value == node2.value:207 break208 stack_path_2.push(cur)209 cur = cur.left210 else:211 if len(stack_path_2) > 0:212 cur = stack_path_2.pop()213 cur = cur.right214 else:215 is_done = True216 print(stack_path_1)217 print(stack_path_2)218 #return traversal219# Set up tree:220tree = BinaryTree(1)221tree.root.left = Node(2)222tree.root.right = Node(3)223tree.root.left.left = Node(4)224tree.root.left.right = Node(5)225tree.root.right.left = Node(6)226tree.root.right.right = Node(7)227tree.root.right.right.right = Node(8)228#tree.root.left.right.left = Node(7)229#tree.root.left.right.right = Node(8)230#tree.root.right.right = Node(6)231#tree.root.right.right.left = Node(9)232#print(tree.height(tree.root))233# Should be True234#print(tree.search(4))235# Should be False:236#print(tree.search(6))237# Test print_tree238print("Preorder Recursive:")239print(tree.print_tree("preorder"))240print("Preorder Iterative:")241print(tree.print_tree("preorder_iterative"))242print("Postorder Recursive")243print(tree.print_tree("postorder"))244#print("Postorder Iterative:")245#print(tree.print_tree("postorder_iterative"))246print("Inorder Recursive:")247print(tree.print_tree("inorder"))248print("Inorder Iterative:")249print(tree.print_tree("inorder_iterative"))250print("Levelorder Iterative:")251print(tree.print_tree("levelorder"))252print("\n")253x = tree.root.left.left254y = tree.root.left.right...
fabfile.py
Source:fabfile.py
1from fabric.api import *2from os import *3env.use_ssh_config = True4env.output_prefix = True5scriptFileName = None6repoList = {}7def get_immediate_subdirectories(a_dir='~/repos/'):8 result = run("find {} -maxdepth 1 -type d".format(a_dir))9 result = result.splitlines()10 del result[0]11 return result12@serial13def reboot():14 with hide('running', 'stdout', 'stderr'):15 is_done = True16 try:17 sudo("shutdown -r 1", shell=False)18 except Exception as e:19 print("{} - {}".format(env.host_string, e))20 is_done = False21 finally:22 return is_done23def apt_update():24 with hide('running', 'stdout', 'stderr'):25 is_done = True26 try:27 pathList = run('ls -a ~')28 if ".noApt" in pathList.stdout:29 print("{} has .noApt - skipping".format(env.host_string))30 return is_done31 print("Updating {} with apt...".format(env.host_string))32 sudo("apt update", shell=False)33 sudo("DEBIAN_FRONTEND=noninteractive apt upgrade -yq", shell=False,34 timeout=120)35 sudo("apt autoremove -yq", shell=False, timeout=120)36 except Exception as e:37 print("{} - {}".format(env.host_string, e))38 is_done = False39 finally:40 return is_done41def yum_update():42 with hide('running', 'stdout', 'stderr'):43 is_done = True44 try:45 print("Updating {} with yum...".format(env.host_string))46 sudo("yum -y update", shell=False)47 except Exception as e:48 print("{} - {}".format(env.host_string, e))49 is_done = False50 finally:51 return is_done52def opkg_update():53 with show('running', 'stdout', 'stderr'):54 is_done = True55 try:56 print("Updating {} with opkg...".format(env.host_string))57 run("opkg update", shell=False)58 packages = run("opkg list-upgradable | awk '{print $1}' | "59 "sed ':M;N;$!bM;s#\\n# #g'", shell=False)60 if packages:61 run("opkg upgrade %s" % packages, shell=False)62 else:63 print("No updated packages on {}".format(env.host_string))64 except Exception as e:65 print("{} - {}".format(env.host_string, e))66 is_done = False67 finally:68 return is_done69def repo_update(repoName="Home Bin", repoDirectory="~/bin/"):70 with hide('running', 'stdout', 'stderr'):71 is_done = True72 try:73 with cd(repoDirectory):74 pathList = run('ls -a')75 if ".noPull" in pathList.stdout:76 print("{} repo {} has .noPull - skipping".format(77 env.host_string, repoName))78 return is_done79 r = run('git pull')80 if 'Already up-to-date' in r.stdout:81 print("{} repo {} already up-to-date".format(82 env.host_string, repoName))83 else:84 print("{} repo {} updated".format(env.host_string,85 repoName))86 if '.postpull.sh' in pathList.stdout:87 print("{} repo {} - updating permissions".format(88 env.host_string, repoName))89 run('./.postpull.sh')90 except Fatal as e:91 print("{} - {}".format(env.host_string, e))92 is_done = False93 finally:94 return is_done95def setup():96 with hide('running', 'stdout', 'stderr'):97 is_done = True98 apt_sudoers = env.user + " * = (root) NOPASSWD:SETENV: /usr/bin/apt,"\99 "/usr/local/bin/apt"100 power_sudoers = env.user + " * = (root) NOPASSWD: /sbin/shutdown"101 try:102 sudo("echo \"{}\" >> /etc/sudoers.d/fabric".format(apt_sudoers))103 sudo("echo \"{}\" >> /etc/sudoers.d/fabric".format(power_sudoers))104 pathList = run('ls -a ~')105 if "repos" not in pathList.stdout:106 print("No repos dir on {} - creating".format(env.host_string))107 run("mkdir ~/repos/")108 if "bin" not in pathList.stdout:109 print("No bin dir on {} - cloning".format(env.host_string))110 run("git clone https://github.com/mbeland/bin.git")111 except Exception as e:112 print("{} - {}".format(env.host_string, e))113 is_done = False114 finally:115 return is_done116@parallel117def updates():118 with hide('running', 'stdout', 'stderr'):119 is_done = True120 try:121 apt_update()122 updateRepos()123 except Exception as e:124 print("{} - {}".format(env.host_string, e))125 is_done = False126 finally:127 return is_done128def updateRepos():129 with hide('running', 'stdout', 'stderr'):130 is_done = True131 reposDir = '~/repos/'132 try:133 repo_update("Home Bin", "~/bin/")134 repo_update("SSH", "~/.ssh")135 repo_update("Home Dir", "~/")136 dirsToUpdate = get_immediate_subdirectories(reposDir)137 for x in dirsToUpdate:138 y = x.split("/")139 repo_update(y[-1], x)140 except Exception as e:141 print("{} - {}".format(env.host_string, e))142 is_done = False143 finally:144 return is_done145def deploy_script():146 with hide('running', 'stdout', 'stderr'):147 is_done = True148 global scriptFileName149 try:150 if scriptFileName:151 scriptFileName = input('Script to execute: ')152 put(scriptFileName, "/tmp/", mirror_local_mode=True)153 remotePath = "/tmp/" + scriptFileName154 run(remotePath)155 run("rm " + remotePath)156 except Exception as e:157 print("{} - {}".format(env.host_string, e))158 is_done = False159 finally:...
train_hand.py
Source:train_hand.py
1#coding=utf-82import rospy3from std_msgs.msg import Int164import time5from geometry_msgs.msg import Twist #导å
¥éè¦çrosçpackage 6import numpy as np7is_done=Int16()8is_done.data=09#å®ä¹åéæ°æ®çå½æ°:åªåä¸æ¬¡ï¼å¯ä»¥èèåå¾å¤æ¬¡ï¼æç»è¦çææå¦ä½ï¼10def data_publish(u):11 #åå§ånode12 13 #åå§åpublisher14 pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10) # topicåæ¶æ¯ç±»åéè¦åè岸æåéç¡®å®ä¸ä¸ 15 16 #åå§åtwist17 twist = Twist()18 # ç»twistèµå¼19 twist.linear.x = u[0] #线é度20 twist.linear.y = 0.021 twist.linear.z = 0.022 twist.angular.x = 0.0 #è§é度23 twist.angular.y = 0.024 twist.angular.z = u[1]25 #è°è¯ä»£ç 26 # print("twist")27 # print(twist)28 #åétwist29 pub.publish(twist) #å°æ°æ®åéåºå»30#åétwistï¼ä¸ç´åæ令ï¼ç´å°ä¸æ¬¡è¿å¨ç»æä¹åå°±ä¸åæ令31def callback_2(data): 32 #æ¥åæ°æ®33 is_done.data= data.data34 #print(finish)35 #å¤æç¶æ36 # if (finish.data == 1):#å®æ转å¨37 # is_done = 138 # # print("callback_2",is_done)39 # else:#没å®æ转å¨40 # is_done = 0 41 # print("moving")42if __name__ == '__main__':43 #åå§å44 rospy.init_node('train_hand', anonymous=True)45 rospy.Subscriber("is_done", Int16, callback_2)46 start_flag = 1 #å¼å§æ令47 twist = Twist() #åé声æ48 u = np.zeros(2) #æ§å¶é声æ49 50 time.sleep(1)51 print(is_done.data)52 #æ§å¶è¯´æ53 print("åè¿ï¼w å·¦æï¼a å³æ:d åéï¼s")54 time.sleep(0.1)55 #å¼å§æ§å¶56 while start_flag == 1: #ä¸ç´å¾ªç¯æ¥åæ°æ®57 control_key = raw_input("raw_inputï¼") #è·å¾é®çè¾å
¥58 if control_key == "w": #åè¿59 #æ¾ç¤ºæ§å¶é60 print("control is w")61 #设置é¢ç62 63 #没æ¶å°ç»æä¿¡æ¯ä¸ç´åéæ§å¶é64 if(is_done.data==1):65 #èµå¼çº¿é度66 u[0] = 167 u[1] = 068 69 #åå¸çº¿é度æ¶æ¯70 data_publish(u)71 72 #以10HZé¢çåéæ°æ®73 74 75 elif control_key == "a": #左转76 #æ¾ç¤ºæ§å¶é77 print("control is a")78 #设置é¢ç79 rate = rospy.Rate(10) # 10hz80 #没æ¶å°ç»æä¿¡æ¯ä¸ç´åéæ§å¶é81 if(is_done.data==1):82 #èµå¼çº¿é度83 u[0] = 084 u[1] = -185 86 #åå¸çº¿é度æ¶æ¯87 data_publish(u)88 89 #å¦ææ¥æ¶å°ç»æä¿¡æ¯ï¼is_done设为1,ç»æ循ç¯90 91 #以10HZé¢çåéæ°æ®92 rate.sleep()93 elif control_key == "d": #å³è½¬94 #æ¾ç¤ºæ§å¶é95 print("control is d")96 #设置é¢ç97 rate = rospy.Rate(10) # 10hz98 if(is_done.data==1):99 #èµå¼çº¿é度100 u[0] = 0101 u[1] = 1102 103 #åå¸çº¿é度æ¶æ¯104 data_publish(u)105 106 #å¦ææ¥æ¶å°ç»æä¿¡æ¯ï¼is_done设为1,ç»æ循ç¯107 108 #以10HZé¢çåéæ°æ®109 rate.sleep()110 elif control_key == "s": #åé111 #æ¾ç¤ºæ§å¶é112 print("control is s")113 #设置é¢ç114 rate = rospy.Rate(10) # 10hz115 if(is_done.data==1):116 #èµå¼çº¿é度117 u[0] = -1118 u[1] = 0119 120 #åå¸çº¿é度æ¶æ¯121 data_publish(u)122 123 #å¦ææ¥æ¶å°ç»æä¿¡æ¯ï¼is_done设为1,ç»æ循ç¯124 125 #以10HZé¢çåéæ°æ®126 rate.sleep()127 128<<<<<<< HEAD129 elif control_key == "q": #å
足æ¥æ130 #æ¾ç¤ºæ§å¶é131 print("control is s")132 #设置é¢ç133 rate = rospy.Rate(10) # 10hz134 if(is_done.data==1):135 #èµå¼çº¿é度136 u[0] = 2137 u[1] = 0138 139 #åå¸çº¿é度æ¶æ¯140 data_publish(u)141 142 #å¦ææ¥æ¶å°ç»æä¿¡æ¯ï¼is_done设为1,ç»æ循ç¯143 144 #以10HZé¢çåéæ°æ®145 rate.sleep()146=======147>>>>>>> 6ca41d4ff84544147f430048270163452df0609e148 #å°is_downå¤ä½149 print("is_doneå·²éç½®")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!