Best Python code snippet using sure_python
nn_transition.py
Source:nn_transition.py
1#!/usr/bin/env python2import rospy3from std_msgs.msg import Float64MultiArray, Float32MultiArray, Int164from std_srvs.srv import SetBool, Empty, EmptyResponse5import math6import numpy as np7import matplotlib.pyplot as plt8from predict_nn import predict_nn9from svm_class import svm_failure10import sys11sys.path.insert(0, '/home/juntao/catkin_ws/src/beliefspaceplanning/gpup_gp_node/src/')12from mean_shift import mean_shift13from gpup_gp_node.srv import batch_transition, batch_transition_repeat, one_transition, setk14# np.random.seed(10)15simORreal = 'sim'16discreteORcont = 'discrete'17probability_threshold = 0.6518class Spin_nn(predict_nn, mean_shift, svm_failure):19 OBS = True20 def __init__(self):21 22 predict_nn.__init__(self)23 svm_failure.__init__(self, discrete = (True if discreteORcont=='discrete' else False))24 mean_shift.__init__(self)25 rospy.Service('/nn/transition', batch_transition, self.GetTransition)26 rospy.Service('/nn/transitionOneParticle', one_transition, self.GetTransitionOneParticle)27 rospy.Service('/nn/transitionRepeat', batch_transition_repeat, self.GetTransitionRepeat)28 rospy.Service('/nn/batchSVMcheck', batch_transition, self.batch_svm_check_service)29 rospy.init_node('nn_transition', anonymous=True)30 print('[nn_transition] Ready.') 31 self.time_svm = 0.32 self.num_checks_svm = 0 33 self.time_nn = 0.34 self.num_checks_nn = 035 self.time_bnn = 0.36 self.num_checks_bnn = 0 37 rospy.spin()38 def batch_svm_check(self, S, a):39 failed_inx = []40 for i in range(S.shape[0]):41 st = rospy.get_time()42 p = self.probability(S[i,:], a) # Probability of failure43 self.time_svm += rospy.get_time() - st44 self.num_checks_svm += 145 prob_fail = np.random.uniform(0,1)46 if prob_fail <= p:47 failed_inx.append(i)48 return failed_inx49 def batch_svm_check_service(self, req):50 S = np.array(req.states).reshape(-1, self.state_dim)51 a = np.array(req.action)52 failed_inx = []53 for i in range(S.shape[0]):54 p = self.probability(S[i,:], a) # Probability of failure55 prob_fail = np.random.uniform(0,1)56 if prob_fail <= p:57 failed_inx.append(i)58 node_probability = 1.0 - float(len(failed_inx))/float(S.shape[0])59 return {'node_probability': node_probability}60 # Predicts the next step by calling the GP class61 def GetTransition(self, req):62 S = np.array(req.states).reshape(-1, self.state_dim)63 a = np.array(req.action)64 if (len(S) == 1):65 st = rospy.get_time()66 p = self.probability(S[0,:], a)67 self.time_svm += rospy.get_time() - st68 self.num_checks_svm += 169 print("------")70 print(S[0,:], a)71 node_probability = 1.0 - p72 sa = np.concatenate((S[0,:],a), axis=0)73 st = rospy.get_time()74 s_next = self.predict(sa)75 self.time_nn += rospy.get_time() - st76 self.num_checks_nn += 177 print(s_next)78 79 collision_probability = 1.0 if (self.OBS and self.obstacle_check(s_next)) else 0.080 81 return {'next_states': s_next, 'mean_shift': s_next, 'node_probability': node_probability, 'collision_probability': collision_probability}82 else: 83 # Check which particles failed84 failed_inx = self.batch_svm_check(S, a)85 try:86 node_probability = 1.0 - float(len(failed_inx))/float(S.shape[0])87 except:88 S_next = []89 mean = [0,0]90 return {'next_states': S_next, 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': np.array([0.,0.]), 'collision_probability': 1.0}91 92 # Remove failed particles by duplicating good ones93 bad_action = np.array([0.,0.])94 if len(failed_inx):95 good_inx = np.delete( np.array(range(S.shape[0])), failed_inx )96 if len(good_inx) == 0: # All particles failed97 S_next = []98 mean = [0,0]99 return {'next_states': S_next, 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': np.array([0.,0.]), 'collision_probability': 1.0}100 # Find main direction of fail101 S_failed_mean = np.mean(S[failed_inx, :], axis=0)102 S_mean = np.mean(S, axis=0)103 ang = np.rad2deg(np.arctan2(S_failed_mean[1]-S_mean[1], S_failed_mean[0]-S_mean[0]))104 if ang <= 45. and ang >= -45.:105 bad_action = np.array([1.,-1.])106 elif ang >= 135. or ang <= -135.:107 bad_action = np.array([-1.,1.])108 elif ang > 45. and ang < 135.:109 bad_action = np.array([1.,1.])110 elif ang < -45. and ang > -135.:111 bad_action = np.array([-1.,-1.])112 dup_inx = good_inx[np.random.choice(len(good_inx), size=len(failed_inx), replace=True)]113 S[failed_inx, :] = S[dup_inx,:]114 # Propagate115 stb = rospy.get_time()116 SA = np.concatenate((S, np.tile(a, (S.shape[0],1))), axis=1)117 S_next = []118 for sa in SA:119 st = rospy.get_time()120 sa_next = self.predict(sa)121 self.time_nn += rospy.get_time() - st122 self.num_checks_nn += 1123 S_next.append(sa_next)124 S_next = np.array(S_next)125 self.time_bnn += rospy.get_time() - stb126 self.num_checks_bnn += 1127 if self.OBS:128 # print "Checking obstacles..."129 failed_inx = []130 good_inx = []131 for i in range(S_next.shape[0]):132 if self.obstacle_check(S_next[i,:]):133 failed_inx.append(i)134 collision_probability = float(len(failed_inx))/float(S.shape[0])135 # node_probability = min(node_probability, node_probability2)136 if len(failed_inx):137 good_inx = np.delete( np.array(range(S_next.shape[0])), failed_inx )138 if len(good_inx) == 0: # All particles failed139 S_next = []140 mean = [0,0]141 return {'next_states': S_next, 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': np.array([0.,0.]), 'collision_probability': 1.0}142 # Find main direction of fail143 S_next_failed_mean = np.mean(S_next[failed_inx, :], axis=0)144 S_next_mean = np.mean(S_next, axis=0)145 ang = np.rad2deg(np.arctan2(S_next_failed_mean[1]-S_next_mean[1], S_next_failed_mean[0]-S_next_mean[0]))146 if ang <= 45. and ang >= -45.:147 bad_action = np.array([1.,-1.])148 elif ang >= 135. or ang <= -135.:149 bad_action = np.array([-1.,1.])150 elif ang > 45. and ang < 135.:151 bad_action = np.array([1.,1.])152 elif ang < -45. and ang > -135.:153 bad_action = np.array([-1.,-1.])154 dup_inx = good_inx[np.random.choice(len(good_inx), size=len(failed_inx), replace=True)]155 S_next[failed_inx, :] = S_next[dup_inx,:]156 else:157 collision_probability = 0.0158 # print('svm time: ' + str(self.time_svm/self.num_checks_svm) + ', prediction time: ' + str(self.time_nn/self.num_checks_nn) + ', batch prediction time: ' + str(self.time_bnn/self.num_checks_bnn))159 mean = np.mean(S_next, 0) #self.get_mean_shift(S_next)160 return {'next_states': S_next.reshape((-1,)), 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': bad_action, 'collision_probability': collision_probability}161 def obstacle_check(self, s):162 Obs = np.array([[-38, 117.1, 4.],163 [-33., 105., 4.],164 [-52.5, 105.2, 4.],165 [43., 111.5, 6.],166 [59., 80., 3.],167 [36.5, 94., 4.]168 ])169 f = 1.2 # inflate170 for obs in Obs:171 if np.linalg.norm(s[:2]-obs[:2]) <= f * obs[2]:172 return True173 return False174 # Predicts the next step by calling the GP class - repeats the same action 'n' times175 def GetTransitionRepeat(self, req):176 n = req.num_repeat177 TranReq = batch_transition()178 TranReq.states = req.states179 TranReq.action = req.action180 for _ in range(n):181 res = self.GetTransition(TranReq)182 TranReq.states = res['next_states']183 prob = res['node_probability']184 if prob < req.probability_threshold:185 break186 187 return {'next_states': res['next_states'], 'mean_shift': res['mean_shift'], 'node_probability': res['node_probability'], 'bad_action': res['bad_action'], 'collision_probability': res['collision_probability']}188 # Predicts the next step by calling the GP class189 def GetTransitionOneParticle(self, req):190 s = np.array(req.state)191 a = np.array(req.action)192 # Check which particles failed193 p = self.probability(s, a)194 node_probability = 1.0 - p195 # Propagate196 sa = np.concatenate((s, a), axis=0)197 s_next = self.predict(sa) 198 return {'next_state': s_next, 'node_probability': node_probability}199if __name__ == '__main__':200 try:201 NN = Spin_nn()202 except rospy.ROSInterruptException:...
nn_run.py
Source:nn_run.py
1#!/usr/bin/env python2import rospy3from gpup_gp_node.srv import batch_transition, one_transition4from nn_node.srv import critic_seq5#from gpup_gp_node_exp.srv import batch_transition, one_transition6import numpy as np7from svm_class import svm_failure8import pickle9from predict_nn import predict_nn10from sklearn.neighbors import KDTree11from sklearn.gaussian_process import GaussianProcessRegressor12from sklearn.gaussian_process.kernels import RBF13CRITIC = True14class Spin_predict(predict_nn, svm_failure):15 state_dim = 416 OBS = True17 def __init__(self):18 predict_nn.__init__(self)19 svm_failure.__init__(self, simORreal = 't42_cyl35', discrete = True)20 rospy.Service('/nn/transition', batch_transition, self.GetTransition)21 rospy.Service('/nn/transitionOneParticle', one_transition, self.GetTransitionOneParticle)22 if CRITIC:23 rospy.Service('/nn/critic_seq', critic_seq, self.GetCritic)24 rospy.init_node('predict', anonymous=True)25 if self.OBS:26 self.Obs = np.array([[-15, 115, 23]])27 if CRITIC:28 self.K = 329 with open('/home/pracsys/catkin_ws/src/t42_control/nn_node/gp_eval/data_P40.pkl', 'rb') as f: 30 self.O, self.E = pickle.load(f)31 if 0:32 self.kdt = KDTree(self.O, leaf_size=100, metric='euclidean')33 else:34 with open('/home/pracsys/catkin_ws/src/t42_control/nn_node/gp_eval/kdt_P40.pkl', 'rb') as f: 35 self.kdt = pickle.load(f)36 self.kernel = RBF(length_scale=1.0, length_scale_bounds=(1e-1, 10.0))37 print('[nn_predict_node] Critic loaded.')38 print('[nn_predict_node] Ready to predict...')39 rospy.spin()40 def batch_svm_check(self, S, a):41 failed_inx = []42 for i in range(S.shape[0]):43 p = self.probability(S[i,:], a) # Probability of failure44 prob_fail = np.random.uniform(0,1)45 if prob_fail <= p:46 failed_inx.append(i)47 return failed_inx48 # Predicts the next step by calling the GP class49 def GetTransition(self, req):50 S = np.array(req.states).reshape(-1, self.state_dim)51 a = np.array(req.action)52 if (len(S) == 1):53 st = rospy.get_time()54 p = self.probability(S[0,:], a)55 node_probability = 1.0 - p56 sa = np.concatenate((S[0,:],a), axis=0)57 s_next = self.predict(sa)58 collision_probability = 1.0 if (self.OBS and self.obstacle_check(s_next)) else 0.059 return {'next_states': s_next, 'mean_shift': s_next, 'node_probability': node_probability, 'collision_probability': collision_probability}60 else: 61 # Check which particles failed62 failed_inx = self.batch_svm_check(S, a)63 try:64 node_probability = 1.0 - float(len(failed_inx))/float(S.shape[0])65 except:66 S_next = []67 mean = [0,0]68 return {'next_states': S_next, 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': np.array([0.,0.]), 'collision_probability': 1.0}69 # Remove failed particles by duplicating good ones70 bad_action = np.array([0.,0.])71 if len(failed_inx):72 good_inx = np.delete( np.array(range(S.shape[0])), failed_inx )73 if len(good_inx) == 0: # All particles failed74 S_next = []75 mean = [0,0]76 return {'next_states': S_next, 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': np.array([0.,0.]), 'collision_probability': 1.0}77 # Find main direction of fail78 S_failed_mean = np.mean(S[failed_inx, :], axis=0)79 S_mean = np.mean(S, axis=0)80 ang = np.rad2deg(np.arctan2(S_failed_mean[1]-S_mean[1], S_failed_mean[0]-S_mean[0]))81 if ang <= 45. and ang >= -45.:82 bad_action = np.array([1.,-1.])83 elif ang >= 135. or ang <= -135.:84 bad_action = np.array([-1.,1.])85 elif ang > 45. and ang < 135.:86 bad_action = np.array([1.,1.])87 elif ang < -45. and ang > -135.:88 bad_action = np.array([-1.,-1.])89 dup_inx = good_inx[np.random.choice(len(good_inx), size=len(failed_inx), replace=True)]90 S[failed_inx, :] = S[dup_inx,:]91 # Propagate92 SA = np.concatenate((S, np.tile(a, (S.shape[0],1))), axis=1)93 S_next = self.predict(SA)94 mean = np.mean(S_next, 0) #self.get_mean_shift(S_next)95 return {'next_states': S_next.reshape((-1,)), 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': bad_action, 'collision_probability': collision_probability}96 def obstacle_check(self, s):97 f = 1.4 #2.0 # inflate98 for o in self.Obs:99 if np.linalg.norm(s[:2]-o[:2]) <= f * o[2]:100 return True101 return False102 def GetTransitionOneParticle(self, req):103 s = np.array(req.state)104 a = np.array(req.action)105 # Check which particles failed106 p = self.probability(s, a)107 node_probability = 1.0 - p108 # Propagate109 sa = np.concatenate((s, a), axis=0)110 s_next = self.predict(sa) 111 # print(self.time_nn / self.num_checks_nn) 112 return {'next_state': s_next, 'node_probability': node_probability}113 def GetCritic(self, req):114 s = np.array(req.state)115 a = np.array(req.future_action)116 n = req.steps117 Apr = np.array(req.seq)118 sa = np.concatenate((s, a, np.array([n]), Apr.reshape((-1))), axis=0)119 # sa = np.append(sa, n)120 idx = self.kdt.query(sa.reshape(1,-1), k = self.K, return_distance=False)121 O_nn = self.O[idx,:].reshape(self.K, -1)122 E_nn = self.E[idx].reshape(self.K, 1)123 gpr = GaussianProcessRegressor(kernel=self.kernel).fit(O_nn, E_nn)124 e, _ = gpr.predict(sa.reshape(1, -1), return_std=True)125 126 return {'err': e}127 128if __name__ == '__main__':129 130 try:131 SP = Spin_predict()132 except rospy.ROSInterruptException:...
test_transactions.py
Source:test_transactions.py
1import transfiles.transactions as transactions2from unittest.mock import Mock, create_autospec, MagicMock3def mock_action_not_failing_anywhere():4 action = create_autospec(transactions.Action)5 return action6def mock_action_failing_on_precommit():7 action = create_autospec(transactions.Action)8 action.pre_commit = Mock(side_effect=RuntimeError("fail!"))9 return action10def mock_action_failing_on_precommit_and_rollback():11 action = create_autospec(transactions.Action)12 action.pre_commit = Mock(side_effect=RuntimeError("fail precommit!"))13 action.rollback = Mock(side_effect=RuntimeError("fail rollback!"))14 return action15def mock_action_failing_on_commit():16 action = create_autospec(transactions.Action)17 action.commit = Mock(side_effect=RuntimeError("fail on commit!"))18 return action19class TestTransactions:20 def test_process_actions_atomically_happy_path(self):21 """tests all is good when all actions finish fine"""22 good_action1 = mock_action_not_failing_anywhere()23 good_action2 = mock_action_not_failing_anywhere()24 transactions.process_actions_atomically([good_action1, good_action2])25 good_action1.pre_commit.assert_called_once()26 good_action2.pre_commit.assert_called_once()27 good_action1.commit.assert_called_once()28 good_action2.commit.assert_called_once()29 good_action1.rollback.assert_not_called()30 good_action2.rollback.assert_not_called()31 def test_rollbacks_itself_if_precommit_fails(self):32 bad_action = mock_action_failing_on_precommit()33 transactions.process_actions_atomically((bad_action,))34 bad_action.pre_commit.assert_called_once()35 bad_action.commit.assert_not_called()36 bad_action.rollback.assert_called_once()37 def test_calls_rollback_in_reverse_order(self):38 manager = MagicMock()39 good_action1 = mock_action_not_failing_anywhere()40 bad_action = mock_action_failing_on_precommit()41 good_action2 = mock_action_not_failing_anywhere()42 good_action1_name = "good_action1"43 manager.attach_mock(good_action1, good_action1_name)44 bad_action_name = "bad_action"45 manager.attach_mock(bad_action, bad_action_name)46 transactions.process_actions_atomically(47 [good_action1, bad_action, good_action2])48 """it should fail on bad action, roll it back, 49 then roll back good_action1.50 good_action2 shouldn't be touched at all"""51 good_action1.pre_commit.assert_called_once()52 good_action1.commit.assert_not_called()53 good_action1.rollback.assert_called_once()54 bad_action.pre_commit.assert_called_once()55 bad_action.commit.assert_not_called()56 bad_action.rollback.assert_called_once()57 good_action2.pre_commit.assert_not_called()58 good_action2.commit.assert_not_called()59 good_action2.rollback.assert_not_called()60 """now test it calls rollbacks in correct sequence 61 (first bad_action, then good_action1)"""62 idx_good_action1_rollback = None63 idx_bad_action_rollback = None64 for idx, call in enumerate(manager.mock_calls):65 if call[0] == f"{good_action1_name}.rollback":66 idx_good_action1_rollback = idx67 elif call[0] == f"{bad_action_name}.rollback":68 idx_bad_action_rollback = idx69 assert idx_good_action1_rollback is not None70 assert idx_bad_action_rollback is not None71 assert idx_bad_action_rollback < idx_good_action1_rollback72 def test_failure_on_rollback_doesnt_prevent_other_rollbacks(self):73 good_action = mock_action_not_failing_anywhere()74 bad_action = mock_action_failing_on_precommit_and_rollback()75 transactions.process_actions_atomically([good_action, bad_action])76 good_action.pre_commit.assert_called_once()77 bad_action.pre_commit.assert_called_once()78 # will fail:79 bad_action.rollback.assert_called_once()80 # but it shouldn't preclude to rollback good action:81 good_action.rollback.assert_called_once()82 good_action.commit.assert_not_called()83 bad_action.commit.assert_not_called()84 def test_failure_on_commit_doesnt_prevent_other_commits(self):85 good_action = mock_action_not_failing_anywhere()86 bad_action = mock_action_failing_on_commit()87 transactions.process_actions_atomically([bad_action, good_action])88 bad_action.pre_commit.assert_called_once()89 good_action.pre_commit.assert_called_once()90 # will fail:91 bad_action.commit.assert_called_once()92 # but should not prevent calling commit on good action:93 good_action.commit.assert_called_once()94 """95 Q: shouldn't bad_action.rollback be called too?96 A: I think no, it would violate rule that transaction is atomic.97 In most cases target action will be done on pre_commit() already, 98 so at worst we'll have some temporary leftover for being able to revert....
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!