Best Python code snippet using pyatom_python
adversary.py
Source:adversary.py
...41 if attack_type == "retraining":42 X = []43 y = []44 for datum in random.sample(dataset, query_budget):45 b = self.client.poll_server(server, predictor_name, [datum])46 X.append(datum)47 y.append(b)48 if kernel_type == "quadratic":49 my_model = svm.SVR(kernel="poly", degree=2)50 else:51 my_model = svm.SVR(kernel=kernel_type)525354 my_model.fit(X, numpy.ravel(y))55 return my_model5657 elif attack_type == "adaptive retraining":58 if len(dataset) >= query_budget > roundsize:5960 pool = random.sample(dataset, query_budget)61 X = []62 y = []63 n = roundsize64 t = math.ceil(query_budget / n)6566 for i in range(0, n): # Initial training data for a basic start to train upon67 a = pool.pop(0)68 b = self.client.poll_server(server, predictor_name, [a])69 X.append(a)70 y.append(b)7172 if kernel_type == "quadratic":73 my_model = svm.NuSVR(kernel="poly", degree=2)74 else:75 my_model = svm.NuSVR(kernel=kernel_type)76 for i in range(0, t - 1): # perform t rounds minus the initial round.77 #print(numpy.ravel(y))78 my_model.fit(X, numpy.ravel(y))7980 if len(my_model.support_vectors_) == 0:81 print("[!] NO SUPPORTVECTORS IN ROUND", i)82 print("[*] Adding another round of random samples")83 #print(my_model.support_)84 #print(my_model.support_vectors_)85 #print(my_model.dual_coef_)86 for q in range(0, n): # Initial training data for a basic start to train upon87 if len(pool) == 0:88 print("[!] Error: Not enough data")89 raise IndexError90 a = pool.pop(0)91 b = self.client.poll_server(server, predictor_name, [a])92 X.append(a)93 y.append(b)94 continue95 print("Training Round", i, " of ", t-1)96 pool, samples = self.get_furthest_samples(pool,97 my_model.support_vectors_,98 kernel_type,99 my_model.coef0,100 my_model.get_params()["gamma"],101 my_model.get_params()["C"],102 n,103 my_model.dual_coef_)104105 for j in samples:106 X.append(j)107 y.append(self.client.poll_server(server, predictor_name, [j]))108 my_model.fit(X, numpy.ravel(y))109 return my_model110 else:111 print("[!] Error: either not enough data in data set, or query budget not bigger than round size.")112 print("[*] Aborting attack...")113 raise ValueError114 elif attack_type == "extraction":115 if kernel_type == "quadratic":116 # NOTE: KEEP IN MIND, IN THE IMPLEMENTATION THE VECTOR INDICES START AT 0, INSTEAD OF 1117 # Also DIMENSION - 1 is max index, not dimenstion itself.118 d_ = self.nCr(dimension, 2) + 2*dimension + 1 # d := Projection dimension119 if d_ > query_budget:120 print("[!] Error: This algorithm will need", d_ ," queries.")121 raise ValueError122 w_ = [0] * d_ # extracted weight vectors123124 null_vector = [0] * dimension125 b_ = self.client.poll_server(server, predictor_name, [null_vector])[0] # b' = w_d c +b126 for dim in range(dimension):127 v_p = dim * [0] + [1] + (dimension - 1 - dim) * [0]128 v_n = dim * [0] + [-1] + (dimension - 1 - dim) * [0]129 f_v_p = self.client.poll_server(server, predictor_name, [v_p])[0] - b_130 f_v_n = self.client.poll_server(server, predictor_name, [v_n])[0] - b_131 w_[dimension - dim + 1 - 2] = (f_v_p + f_v_n) / 2132 w_[d_ - dim - 2] = (f_v_p - f_v_n) / 2133134 class QuadraticMockModel:135 def __init__(self, d__, w__, b__):136 self.dim = d__137 self.w = w__138 self.b = b__139140 def phi(self, x__):141 vec = []142 for i__ in x__[::-1]:143 vec.append(i__**2)144 for i__ in reversed(range(len(x__))):145 for j__ in reversed(range(i__)):146 vec.append(math.sqrt(2)*x__[i__]*x__[j__])147 for i__ in x__[::-1]:148 vec.append(i__)149 vec.append(0)150 return vec151152 def predict(self, arr):153 rv = []154 for v__ in arr:155 val = numpy.dot(self.w, self.phi(v__)) + self.b156 rv.append(val)157 return rv158159 if dimension <= 2:160 return QuadraticMockModel(d_, w_, b_)161 for dim_i in range(dimension):162 for dim_j in range(dim_i + 1, dimension):163 #print(dim_i, dim_j)164 v = dimension*[0]165 v[dim_i], v[dim_j] = 1, 1166 f_v = self.client.poll_server(server, predictor_name, [v])[0]167 r = self.r_index(dim_i + 1, dim_j + 1, dimension) - 1168 w_[r] = (f_v - w_[dimension - dim_i + 1 - 2] - w_[dimension - dim_j + 1 - 2] - w_[d_ - dim_i - 2] - w_[d_ - dim_j - 2] - b_) / math.sqrt(2)169 print("[+] w' extrahiert:", w_)170171 return QuadraticMockModel(d_, w_, b_)172173 if kernel_type != "linear":174 print("[!] Error: Unsupported Kernel for extraction attack.")175 raise ValueError176 d = [0] * dimension177 b = self.client.poll_server(server, predictor_name, [d])[0]178 w = []179 for j in range(0, dimension):180 x = j * [0] + [1] + (dimension - 1 - j) * [0]181 w.append(self.client.poll_server(server, predictor_name, [x])[0]-b)182 print("[+] Model parameters have been successfully extracted")183 print("[*] weight (w):", w)184 print("[*] bias (b):", b)185 print("[*] Building mock model...")186187 class LinearMockModel:188 def __init__(self, d__, w__, b__):189 self.dim = d__190 self.w = w__191 self.b = b__192193 def predict(self, arr):194 rv = []195 for v__ in arr:196 val = numpy.dot(self.w, v__) + self.b197 rv.append(val)198 return rv199200 return LinearMockModel(dimension, w, b)201 else:202 print("[!] Error: unknown attack type for svr")203 print("[*] Aborting attack...")204 raise ValueError205206 def attack_svm(self, server, predictor_name, kernel_type, attack_type, dimension, query_budget, dataset=None, roundsize=5):207 if dataset is None or len(dataset) < 2:208 print("[!] Dataset too small")209 print("[*] Aborting attack...")210 raise ValueError211 if not isinstance(dataset, list):212 dataset = dataset.tolist()213 if attack_type == "retraining":214 my_model = svm.SVC(kernel=kernel_type)215 X = []216 y = []217218 for datum in random.sample(dataset, query_budget):219 b = self.client.poll_server(server, predictor_name, [datum])220 X.append(datum)221 y.append(b)222 my_model.fit(X, numpy.ravel(y))223 return my_model224225 elif attack_type == "adaptive retraining":226 if len(dataset) >= query_budget > roundsize:227 pool = random.sample(dataset, query_budget)228 x = []229 y = []230 n = roundsize231 t = math.ceil(query_budget / n)232 for i in range(0, n):233 a = pool.pop(0)234 b = self.client.poll_server(server, predictor_name, [a])[0]235 x.append(a)236 y.append(b)237238 while min(y) == max(y):239 for i in range(0, n):240 a = pool.pop(0)241 b = self.client.poll_server(server, predictor_name, [a])[0]242 x.append(a)243 y.append(b)244 t -= 1245 print("[*] Additional initial random round had to be done due to no variance")246 my_model = svm.SVC(kernel=kernel_type)247 for i in range(0, t-1):248249 my_model.fit(x, numpy.ravel(y))250 for j in range(0, n):251 if not pool:252 break253 distances = my_model.decision_function(pool).tolist()254 closest = pool.pop(distances.index(min(distances)))255 x.append(closest)256 y.append(self.client.poll_server(server, predictor_name, [closest])[0])257 my_model.fit(x, numpy.ravel(y))258 return my_model259 else:260 print("[!] Error: dataset to small or roundsize bigger than query_budget")261 raise ValueError262 elif attack_type == "lowd-meek":263 if len(dataset) != 2:264 print("[!] Error: For Lowd-Meek attack, please provide exactly a positive and a negative sample")265 raise ValueError266 elif kernel_type != "linear":267 print("[!] Error: Unsupported Kernel by lowd-meek attack")268 raise ValueError269 else:270 print("[*] Initiating lowd-meek attack.")271 epsilon = 0.01272 d = 0.01273 vector1 = dataset[0]274 vector2 = dataset[1]275 vector1_category = numpy.ravel(self.client.poll_server(server, predictor_name, [vector1]))276 vector2_category = numpy.ravel(self.client.poll_server(server, predictor_name, [vector2]))277 if vector1_category == vector2_category:278 print("[!] Error: Provided Samples are in same category")279 raise ValueError280 else:281 if vector1_category == [0]:282 print(vector1_category, "is 0")283 negative_instance = vector1284 positive_instance = vector2285 else:286 print(vector2_category, "is 0")287 negative_instance = vector2288 positive_instance = vector1289290 #sign_witness_p = positive_instance291 sign_witness_n = negative_instance292 print("[+] Positive and Negative Instance confirmed.")293 for feature in range(0, len(sign_witness_n)):294 print("[*] Finding Signwitness. Checking feature", feature)295 f = sign_witness_n[feature]296 sign_witness_n[feature] = positive_instance[feature]297 if numpy.ravel(self.client.poll_server(server, predictor_name, [sign_witness_n])) == [1]:298 sign_witness_p = sign_witness_n.copy()299300 sign_witness_n[feature] = f301 f_index = feature302 print("[+] Sign Witnesses found with feature index:", f_index)303 break304305 weight_f = 1 * (sign_witness_p[feature] - sign_witness_n[feature]) / abs(sign_witness_p[feature] - sign_witness_n[feature])306 # Find Negative Instance of x with gap(x) < epsilon/4307 delta = sign_witness_p[feature] - sign_witness_n[feature]308309 seeker = sign_witness_n310 #seeker[feature] = sign_witness_p[feature] - delta311 #print(sign_witness_p)312 #print(sign_witness_n)313 while True:314 #print("S - ", seeker)315 pred = self.client.poll_server(server, predictor_name, [seeker])316 #print("p:", pred)317 if pred == [1]:318 #print("Positive. delta", delta)319 delta = delta / 2320 seeker[feature] = seeker[feature] - delta321 else:322 #print("Negative. delta", delta)323 if abs(delta) < epsilon/4:324 print("[+] found hyperplane crossing", seeker)325 break326 delta = delta / 2327 seeker[feature] = seeker[feature] + delta328 # seeker should be that negative instance now.329 crossing = seeker.copy()330 seeker[feature] += 1331 classification = numpy.ravel(self.client.poll_server(server, predictor_name, [seeker]))332333 dooble = seeker.copy() # dooble is negative instance334335 weight = [0]*len(dooble)336 #print("Weight on initieal feature", weight_f)337338 for otherfeature in range(0, len(dooble)):339 if otherfeature == feature:340 weight[otherfeature] = weight_f341 continue342 # line search on the other features343 dooble[otherfeature] += 1/d344 if numpy.ravel(self.client.poll_server(server, predictor_name, [dooble])) == classification:345 #print("DIDNOTCHANGE")346 doox = dooble.copy()347 dooble[otherfeature] -= 2/d348 if numpy.ravel(self.client.poll_server(server, predictor_name, [dooble])) == classification: # if even though added 1/d class stays the same -> weigh = 0349 weight[otherfeature] = 0350 dooble[otherfeature] = seeker[otherfeature]351 #print("found weightless feature,", otherfeature)352 continue353 else:354 distance_max = -1/d355 else:356357 distance_max = 1/d358359 distance_min = 0360 distance_mid = (distance_max + distance_min) / 2361 dooble[otherfeature] = seeker[otherfeature] + distance_mid362363 while abs(distance_mid - distance_min) > epsilon / 4:364365 if numpy.ravel(self.client.poll_server(server, predictor_name, [dooble])) != classification:366367 distance_min = distance_min368 distance_max = distance_mid369 distance_mid = (distance_min + distance_max) / 2370 dooble[otherfeature] = seeker[otherfeature] + distance_mid371 else:372 distance_min = distance_mid373 distance_mid = (distance_min + distance_max) / 2374 distance_max = distance_max375 dooble[otherfeature] = seeker[otherfeature] + distance_mid376 test = seeker[otherfeature]-dooble[otherfeature]377 weight[otherfeature] = weight_f / test378 continue379 print("[+] Found Weights", weight)380 a = -(weight[0] / weight[1])381 intercept = crossing[1] - a * crossing[0]382 print("[+] Found Intercept (2d)", intercept)383384 class LinearMockSVM:385 def __init__(self, w__, b__):386 self.w__ = w__387 self.b__ = b__*w__[1] # norm388389 def predict(self, val):390 rv = []391 for v in val:392 #print(numpy.sign(numpy.dot(self.w__, v) - self.b__))393 rv.append(0) if numpy.sign(numpy.dot(self.w__, v) - self.b__) == -1 else rv.append(1)394 return rv395 return LinearMockSVM(weight, intercept)396 else:397 print("Error: Unknown attack type")398 raise ValueError399400 def attack(self, server, predictor_name, predictor_type, kernel_type, attack_type, dimension, query_budget, dataset=None, roundsize=5):401 self.client.reset_poll_count()402 random.seed()403 if attack_type not in self.attack_types[predictor_type][kernel_type]:404 print("[!] Error: Attack type not compatible with kernel type")405 print("[*] Aborting attack...")406 raise ValueError407 if predictor_type == "svr" or predictor_type == "SVR":408 return self.attack_svr(server, predictor_name, kernel_type, attack_type, dimension, query_budget, dataset=dataset, roundsize=roundsize)409 elif predictor_type == "svm" or predictor_type == "SVM":410 return self.attack_svm(server, predictor_name, kernel_type, attack_type, dimension, query_budget, dataset=dataset, roundsize=roundsize)411 else:412 return None413414 def k(self, kernel_type, x_i, x_j, coef0=0, gamma=1):415 if gamma == 'auto':416 gamma = 1/len(x_i)417 if kernel_type == "linear":418 return numpy.dot(x_i, x_j)419 elif kernel_type == "quadratic":420 return (numpy.dot(x_i, x_j) + coef0)**2421 elif kernel_type == "rbf":422 return numpy.exp((numpy.linalg.norm(numpy.asarray(x_i) - numpy.asarray(x_j))**2)*(-1)*gamma)423 elif kernel_type == "sigmoid":424 return numpy.tanh(gamma*numpy.dot(x_i, x_j)+coef0)425 else:426 print("[!] Error: Unknown kernel type")427 raise ValueError428429 def get_furthest_samples(self, pool, support_vectors, kernel_type, coef0, gamma, C, n, dual_coef):430 # for each sample in a pool calculate the distances from that sample to each support vector431 # pick the closest vector as the minimum distance432 # create433434 ranking = [{"closest_support_vector_id": 0, "minimum_distance": 0, "sample_id": 0, "total_score": 0}]435 furthest_samples = []436 distances = []437438 #print("SUPP VEC", support_vectors)439440 for index, sample in enumerate(pool):441 sample_distances = []442 #print("SAMPLE", sample)443 for sv in support_vectors:444 distance = math.sqrt(self.k(kernel_type, sample, sample, coef0, gamma)+self.k(kernel_type, sv, sv, coef0, gamma)+2*self.k(kernel_type, sample, sv, coef0, gamma))445 sample_distances.append(distance)446 closest_index = sample_distances.index(min(sample_distances))447 #print("CLOSEST", sample_distances[closest_index])448 total_score = abs(sample_distances[closest_index] * dual_coef[0][closest_index] / C)449 distances.append((index, sample_distances, closest_index, total_score))450 #print(sorted(distances, key=lambda x: x[3]))451 indexes = []452 for sample in sorted(distances, key=lambda x: x[3])[:n]:453 furthest_samples.append(pool[sample[0]])454 indexes.append(sample[0])455 for index in sorted(indexes, reverse=True):456 del pool[index]457458 return pool, furthest_samples459460 def get_last_query_count(self):461 return self.client.poll_count462463 def _model_factory(self, predictor_type, kernel, weights, intercept, point):464 pass465466 def nCr(self, n, r):467 return math.factorial(n)//math.factorial(r)//math.factorial(n-r)468469 def r_index(self, t, s, n):470 l = 0471 for i in range(1, n-s+1+1):472 l += n - i473 l += n-t+1474 return l475476 def kernel_agnostic_attack(self, server, predictor_name, predictor_type, dimension, query_budget, dataset, test_set_X):477 kernels = ["linear", "rbf", "poly", "sigmoid"]478 models = []479 if not isinstance(dataset, list):480 dataset = dataset.tolist()481 dataset = random.sample(dataset, query_budget) # all train on the same dataset482 for kernel in kernels:483 print("extracting as", kernel)484 if predictor_type == "SVR":485 model = self.attack_svr(server, predictor_name, kernel, "retraining", dimension,486 query_budget, dataset)487 elif predictor_type == "SVM":488 model = self.attack_svm(server, predictor_name, kernel, "retraining", dimension,489 query_budget, dataset)490 else:491 print("[!] Error: Specify predictor type")492 raise ValueError493 models.append(model)494495 correct_predictions = self.client.poll_server(server, predictor_name, test_set_X)496 scores = []497 for model in models:498 if predictor_type == "SVM":499 scores.append(accuracy_score(correct_predictions, model.predict(test_set_X)))500 else:501 scores.append(mean_squared_error(correct_predictions, model.predict(test_set_X)))502503 if predictor_type == "SVM":504 best = max(enumerate(scores), key=itemgetter(1))[0]505 else:506 best = min(enumerate(scores), key=itemgetter(1))[0]507 print(list(zip(kernels, scores)))508 print("Best Kernel: ", kernels[best])509 return models[best]
...
basehooks.py
Source:basehooks.py
...71 return vehicle["lastLapTime"]72 return None737475def poll_server(event, sync=False):76 if not sync:77 background_thread = Thread(target=poll_server_async, args=(event,), daemon=True)78 background_thread.start()79 else:80 poll_server_async(event)818283def poll_status_server(status):84 background_thread = Thread(85 target=poll_server_status_async, args=(status,), daemon=True86 )87 background_thread.start()888990def best_lap(driver, time, team, newStatus):91 print("New best lap {}: {}".format(driver, time))929394def new_lap(driver, laps, newStatus):95 print("New lap count {}: {}".format(driver, laps))96 event_time = newStatus["currentEventTime"]97 session = newStatus["session"]98 last_lap_time = get_last_lap_time(driver, newStatus)99 poll_server(100 {101 "driver": driver,102 "laps": laps,103 "type": "LC",104 "event_time": event_time,105 "session": session,106 "slot_id": get_slot_by_name(driver, newStatus),107 "last_lap_time": last_lap_time,108 }109 )110111112def on_pos_change(driver, old_pos, new_pos, newStatus):113 print("New position for {}: {} (was {}) ".format(driver, new_pos, old_pos))114 event_time = newStatus["currentEventTime"]115 session = newStatus["session"]116 poll_server(117 {118 "driver": driver,119 "old_pos": old_pos,120 "new_pos": new_pos,121 "type": "P",122 "event_time": event_time,123 "session": session,124 "slot_id": get_slot_by_name(driver, newStatus),125 }126 )127128129def on_pos_change_yellow(driver, old_pos, new_pos, newStatus):130 print("New position for {}: {} (was {}) ".format(driver, new_pos, old_pos))131 event_time = newStatus["currentEventTime"]132 session = newStatus["session"]133 poll_server(134 {135 "driver": driver,136 "old_pos": old_pos,137 "new_post": new_pos,138 "type": "PY",139 "event_time": event_time,140 "session": session,141 "slot_id": get_slot_by_name(driver, newStatus),142 }143 )144145146def test_lag(driver, speed, old_speed, location, nearby, team, additional, newStatus):147 event_time = newStatus["currentEventTime"]148 session = newStatus["session"]149 print(150 "Suspected lag for {} v={}, v_old={}, l={}, nearby={}".format(151 driver, speed, old_speed, location, nearby152 )153 )154 poll_server(155 {156 "driver": driver,157 "speed": speed,158 "old_speed": old_speed,159 "location": location,160 "nearby": nearby,161 "team": team,162 "type": "L",163 "event_time": event_time,164 "session": session,165 "slot_id": get_slot_by_name(driver, newStatus),166 }167 )168169170def add_penalty(driver, old_penalty_count, penalty_count, newStatus):171 print("A penalty was added for {}. Sum={}".format(driver, penalty_count))172 event_time = newStatus["currentEventTime"]173 session = newStatus["session"]174 slot = get_slot_by_name(driver, newStatus)175 laps = get_prop_by_slot(slot, newStatus, "lapsCompleted")176 poll_server(177 {178 "sum": penalty_count,179 "driver": driver,180 "type": "P+",181 "event_time": event_time,182 "session": session,183 "slot_id": slot,184 "laps": laps,185 }186 )187188189def revoke_penalty(driver, old_penalty_count, penalty_count, newStatus):190 print("A penalty was removed for {}. Sum={}".format(driver, penalty_count))191 event_time = newStatus["currentEventTime"]192 session = newStatus["session"]193 slot = get_slot_by_name(driver, newStatus)194 laps = get_prop_by_slot(slot, newStatus, "lapsCompleted")195 poll_server(196 {197 "sum": penalty_count,198 "driver": driver,199 "type": "P-",200 "event_time": event_time,201 "session": session,202 "slot_id": slot,203 "laps": laps,204 }205 )206207208def personal_best(driver, old_best, new_best, newStatus):209 print(210 "A personal best was set: {} old={}, new={}".format(driver, old_best, new_best)211 )212 event_time = newStatus["currentEventTime"]213 session = newStatus["session"]214 slot = get_slot_by_name(driver, newStatus)215 laps = get_prop_by_slot(slot, newStatus, "lapsCompleted")216 poll_server(217 {218 "new_best": new_best,219 "old_best": old_best,220 "driver": driver,221 "type": "PB",222 "event_time": event_time,223 "session": session,224 "slot_id": slot,225 "laps": laps,226 }227 )228229230def on_pit_change(driver, old_status, status, newStatus):231 print(232 "Pit status change for {} is now {}, was {}".format(driver, status, old_status)233 )234 event_time = newStatus["currentEventTime"]235 session = newStatus["session"]236 slot = get_slot_by_name(driver, newStatus)237 laps = get_prop_by_slot(slot, newStatus, "lapsCompleted")238 if status != "REQUEST": # request is a bit too leaky for the public239 poll_server(240 {241 "old_status": old_status,242 "status": status,243 "driver": driver,244 "type": "PS",245 "event_time": event_time,246 "session": session,247 "slot_id": slot,248 "laps": laps,249 }250 )251252253def on_garage_toggle(driver, old_status, status, newStatus):254 event_time = newStatus["currentEventTime"]255 session = newStatus["session"]256 slot = get_slot_by_name(driver, newStatus)257 laps = get_prop_by_slot(slot, newStatus, "lapsCompleted")258 if status:259 print("{} is now exiting the garage".format(driver))260 poll_server(261 {262 "old_status": old_status,263 "status": status,264 "driver": driver,265 "type": "GO",266 "event_time": event_time,267 "session": session,268 "slot_id": get_slot_by_name(driver, newStatus),269 }270 )271 else:272 print("{} returned to the garage".format(driver))273 poll_server(274 {275 "old_status": old_status,276 "status": status,277 "driver": driver,278 "type": "GI",279 "event_time": event_time,280 "session": session,281 "slot_id": slot,282 "laps": laps,283 }284 )285286287pit_times = {}288289290def on_pitting(driver, old_status, status, newStatus):291 event_time = newStatus["currentEventTime"]292 session = newStatus["session"]293294 slot = get_slot_by_name(driver, newStatus)295 laps = get_prop_by_slot(slot, newStatus, "lapsCompleted")296 if status:297 pit_times[driver] = time()298 print("{} is now pitting".format(driver))299 poll_server(300 {301 "driver": driver,302 "type": "PSS",303 "event_time": event_time,304 "session": session,305 "slot_id": slot,306 "laps": laps,307 }308 )309310 else:311 try:312 start_time = pit_times[driver] if driver in pit_times else 0313 if start_time > 0:314 duration = time() - start_time315 print(316 "{} finished pitting. Pit took {} seconds.".format(driver, duration)317 )318 poll_server(319 {320 "driver": driver,321 "type": "PSE",322 "event_time": event_time,323 "session": session,324 "slot_id": slot,325 "laps": laps,326 }327 )328 else:329 print("{} finished pitting".format(driver))330 poll_server(331 {332 "driver": driver,333 "type": "PSE",334 "event_time": event_time,335 "session": session,336 "slot_id": slot,337 "laps": laps,338 }339 )340 except:341 import traceback342343 print(traceback.print_exc())344345346def status_change(driver, old_status, new_status, newStatus):347 print(348 "Finish status change for {} is now {}, was {}".format(349 driver, new_status, old_status350 )351 )352353 event_time = newStatus["currentEventTime"]354 session = newStatus["session"]355356 slot = get_slot_by_name(driver, newStatus)357 laps = get_prop_by_slot(slot, newStatus, "lapsCompleted")358 poll_server(359 {360 "driver": driver,361 "old_status": old_status,362 "status": new_status,363 "type": "S",364 "event_time": event_time,365 "session": session,366 "slot_id": slot,367 "laps": laps,368 }369 )370371372def on_flag_change(driver, old_flag, new_flag, newStatus):373 print(374 "Driver {} sees a flag change to {} (was {})".format(driver, new_flag, old_flag)375 )376377378def on_tick(status):379 poll_status_server(status)380381def do_stat_poll(target):382 get(target, headers= {383 "User-Agent": 'apx-reciever'384 })385386def on_stop(status):387 poll_status_server(status) 388 try: 389 target_url = PING_TARGET + "/stop"390 do_stat_poll(target_url)391 except:392 pass393394395396def on_start(): 397 try: 398 target_url = PING_TARGET + "/start"399 do_stat_poll(target_url)400 except:401 pass402403404def on_deploy():405 publish_logfile()406 try: 407 target_url = PING_TARGET + "/deploy_finished"408 do_stat_poll(target_url,)409 except:410 pass411412413def on_car_count_change(old_status_cars, new_status_cars, newStatus):414 config = get_server_config()415 old_slot_ids = []416 welcome_message = config["mod"]["welcome_message"]417418 if welcome_message:419 for old_car in old_status_cars:420 old_slot_ids.append(old_car["slotID"])421422 for new_car in new_status_cars:423 slot_id = new_car["slotID"]424 driver_name = new_car["driverName"]425 if slot_id not in old_slot_ids:426 parts = welcome_message.split(linesep)427 for part in parts:428 message = part.replace("{driver_name}", driver_name)429 chat(config, part.replace("{driver_name}", driver_name))430431432def on_low_speed(driver, speed, location, nearby, team, additional, newStatus):433434 event_time = newStatus["currentEventTime"]435 slot = get_slot_by_name(driver, newStatus)436 laps = get_prop_by_slot(slot, newStatus, "lapsCompleted")437 print(driver, location)438 session = newStatus["session"]439 poll_server(440 {441 "driver": driver,442 "type": "VL",443 "event_time": event_time,444 "session": session,445 "slot_id": slot,446 "laps": laps,447 "nearby": nearby,448 "speed": speed,449 "location": location,450 }451 )452453454def on_driver_swap(slotId, old_driver, new_driver, newStatus):455 event_time = newStatus["currentEventTime"]456 slot = get_slot_by_name(new_driver, newStatus)457 laps = get_prop_by_slot(slot, newStatus, "lapsCompleted")458459 session = newStatus["session"]460 poll_server(461 {462 "type": "DS",463 "event_time": event_time,464 "session": session,465 "slot_id": slot,466 "laps": laps,467 "old_driver": old_driver,468 "new_driver": new_driver,469 }470 )471472473def on_state_change(descriptor, args):
...
test_ldclient_end_to_end.py
Source:test_ldclient_end_to_end.py
1from ldclient.client import LDClient2from ldclient.config import Config, HTTPConfig3from testing.http_util import BasicResponse, SequentialHandler, start_secure_server, start_server4from testing.stub_util import make_put_event, poll_content, stream_content5import json6import pytest7import sys8sdk_key = 'sdk-key'9user = { 'key': 'userkey' }10always_true_flag = { 'key': 'flagkey', 'version': 1, 'on': False, 'offVariation': 1, 'variations': [ False, True ] }11def test_client_starts_in_streaming_mode():12 with start_server() as stream_server:13 with stream_content(make_put_event([ always_true_flag ])) as stream_handler:14 stream_server.for_path('/all', stream_handler)15 config = Config(sdk_key = sdk_key, stream_uri = stream_server.uri, send_events = False)16 with LDClient(config = config) as client:17 assert client.is_initialized()18 assert client.variation(always_true_flag['key'], user, False) == True19 r = stream_server.await_request()20 assert r.headers['Authorization'] == sdk_key21def test_client_fails_to_start_in_streaming_mode_with_401_error():22 with start_server() as stream_server:23 stream_server.for_path('/all', BasicResponse(401))24 config = Config(sdk_key = sdk_key, stream_uri = stream_server.uri, send_events = False)25 with LDClient(config = config) as client:26 assert not client.is_initialized()27 assert client.variation(always_true_flag['key'], user, False) == False28def test_client_retries_connection_in_streaming_mode_with_non_fatal_error():29 with start_server() as stream_server:30 with stream_content(make_put_event([ always_true_flag ])) as stream_handler:31 error_then_success = SequentialHandler(BasicResponse(503), stream_handler)32 stream_server.for_path('/all', error_then_success)33 config = Config(sdk_key = sdk_key, stream_uri = stream_server.uri, initial_reconnect_delay = 0.001, send_events = False)34 with LDClient(config = config) as client:35 assert client.is_initialized()36 assert client.variation(always_true_flag['key'], user, False) == True37 r = stream_server.await_request()38 assert r.headers['Authorization'] == sdk_key39def test_client_starts_in_polling_mode():40 with start_server() as poll_server:41 poll_server.for_path('/sdk/latest-all', poll_content([ always_true_flag ]))42 config = Config(sdk_key = sdk_key, base_uri = poll_server.uri, stream = False, send_events = False)43 with LDClient(config = config) as client:44 assert client.is_initialized()45 assert client.variation(always_true_flag['key'], user, False) == True46 r = poll_server.await_request()47 assert r.headers['Authorization'] == sdk_key48def test_client_fails_to_start_in_polling_mode_with_401_error():49 with start_server() as poll_server:50 poll_server.for_path('/sdk/latest-all', BasicResponse(401))51 config = Config(sdk_key = sdk_key, base_uri = poll_server.uri, stream = False, send_events = False)52 with LDClient(config = config) as client:53 assert not client.is_initialized()54 assert client.variation(always_true_flag['key'], user, False) == False55def test_client_sends_event_without_diagnostics():56 with start_server() as poll_server:57 with start_server() as events_server:58 poll_server.for_path('/sdk/latest-all', poll_content([ always_true_flag ]))59 events_server.for_path('/bulk', BasicResponse(202))60 config = Config(sdk_key = sdk_key, base_uri = poll_server.uri, events_uri = events_server.uri, stream = False,61 diagnostic_opt_out = True)62 with LDClient(config = config) as client:63 assert client.is_initialized()64 client.identify(user)65 client.flush()66 r = events_server.await_request()67 assert r.headers['Authorization'] == sdk_key68 data = json.loads(r.body)69 assert len(data) == 170 assert data[0]['kind'] == 'identify'71def test_client_sends_diagnostics():72 with start_server() as poll_server:73 with start_server() as events_server:74 poll_server.for_path('/sdk/latest-all', poll_content([ always_true_flag ]))75 events_server.for_path('/diagnostic', BasicResponse(202))76 config = Config(sdk_key = sdk_key, base_uri = poll_server.uri, events_uri = events_server.uri, stream = False)77 with LDClient(config = config) as client:78 assert client.is_initialized()79 r = events_server.await_request()80 assert r.headers['Authorization'] == sdk_key81 data = json.loads(r.body)82 assert data['kind'] == 'diagnostic-init'83# The TLS tests are skipped in Python 3.3 because the embedded HTTPS server does not work correctly, causing84# a TLS handshake failure on the client side. It's unclear whether this is a problem with the self-signed85# certificate we are using or with some other server settings, but it does not appear to be a client-side86# problem.87@pytest.mark.skipif(sys.version_info.major == 3 and sys.version_info.minor == 3, reason = "test is skipped in Python 3.3")88def test_cannot_connect_with_selfsigned_cert_by_default():89 with start_secure_server() as server:90 server.for_path('/sdk/latest-all', poll_content())91 config = Config(92 sdk_key = 'sdk_key',93 base_uri = server.uri,94 stream = False,95 send_events = False96 )97 with LDClient(config = config, start_wait = 1.5) as client:98 assert not client.is_initialized()99@pytest.mark.skipif(sys.version_info.major == 3 and sys.version_info.minor == 3, reason = "test is skipped in Python 3.3")100def test_can_connect_with_selfsigned_cert_if_ssl_verify_is_false():101 with start_secure_server() as server:102 server.for_path('/sdk/latest-all', poll_content())103 config = Config(104 sdk_key = 'sdk_key',105 base_uri = server.uri,106 stream = False,107 send_events = False,108 http = HTTPConfig(disable_ssl_verification=True)109 )110 with LDClient(config = config) as client:111 assert client.is_initialized()112@pytest.mark.skipif(sys.version_info.major == 3 and sys.version_info.minor == 3, reason = "test is skipped in Python 3.3")113def test_can_connect_with_selfsigned_cert_if_disable_ssl_verification_is_true():114 with start_secure_server() as server:115 server.for_path('/sdk/latest-all', poll_content())116 config = Config(117 sdk_key = 'sdk_key',118 base_uri = server.uri,119 stream = False,120 send_events = False,121 http = HTTPConfig(disable_ssl_verification = True)122 )123 with LDClient(config = config) as client:124 assert client.is_initialized()125@pytest.mark.skipif(sys.version_info.major == 3 and sys.version_info.minor == 3, reason = "test is skipped in Python 3.3")126def test_can_connect_with_selfsigned_cert_by_setting_ca_certs():127 with start_secure_server() as server:128 server.for_path('/sdk/latest-all', poll_content())129 config = Config(130 sdk_key = 'sdk_key',131 base_uri = server.uri,132 stream = False,133 send_events = False,134 http = HTTPConfig(ca_certs = './testing/selfsigned.pem')135 )136 with LDClient(config = config) as client:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!