Best Python code snippet using robotframework
sparkampserver.py
Source:sparkampserver.py
...66 if not success:67 self.connection_lost_event()68 else:69 self.request_preset(hw_preset)70 self.log_debug_message("change_to_preset - value " + str(hw_preset))71 def change_effect(self, old_effect, new_effect):72 cmd = self.msg.change_effect(old_effect, new_effect)73 success = self.comms.send_it(cmd[0])74 if not success:75 self.connection_lost_event()76 self.log_debug_message("change_effect - old: " +77 old_effect + " new: " + new_effect)78 # Have they unloaded effect that was controlled by expression?79 if self.plugin.name == old_effect:80 self.log_debug_message("change_effect - update_plugin")81 self.update_plugin(enabled=False)82 def change_effect_parameter(self, effect, parameter, value):83 cmd = self.msg.change_effect_parameter(effect, parameter, value)84 success = self.comms.send_it(cmd[0])85 if not success:86 self.connection_lost_event()87 self.log_debug_message("change parameter: effect: " + effect +88 " parameter: " + str(parameter) + " value: " + str(value))89 def connect(self):90 try:91 bt_devices = bluetooth.discover_devices(lookup_names=True)92 address = None93 for addr, bt_name in bt_devices:94 if bt_name == 'Spark 40 Audio':95 address = addr96 self.bt_sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)97 self.bt_sock.connect((address, 2))98 self.reader = SparkReadMessage()99 self.comms = SparkComms(self.bt_sock)100 # Start a separate thread to listen for control changes from the amp101 self.listener = SparkListener(self.reader, self.comms,102 self.notifier)103 t = threading.Thread(target=self.listener.start,104 args=(),105 daemon=True)106 t.start()107 self.connected = True108 self.socketio.emit(dict_connection_message,109 {dict_message: msg_retrieving_config})110 self.initialise()111 self.socketio.emit(dict_connection_message,112 {dict_message: msg_amp_connected})113 except Exception as e:114 print(e)115 self.socketio.emit(dict_connection_message,116 {dict_message: msg_connection_failed})117 def log_debug_message(self, message):118 if self.debug_logging == True:119 self.socketio.emit(dict_update_debug_log,120 {dict_message: message})121 def initialise(self):122 return self.comms.send_state_request()123 def eject(self):124 self.config = None125 self.listener.stop()126 # Send a final request to the amp for the Listener thread to realise it has to stop listening127 self.request_preset(0)128 # Now close the Bluetooth connection and release the amp into the wild.129 self.bt_sock.close()130 self.connected = False131 def expression_pedal(self, value):132 if self.disable_expression_pedal == True:133 self.log_debug_message(134 "expression_pedal - value received but is disabled")135 return136 self.log_debug_message(137 "expression_pedal - value received: " + str(value))138 if self.plugin.type == "param":139 self.log_debug_message("expression_pedal - plugin.type = param")140 params = self.plugin.calculate_params(value)141 if params == None:142 self.log_debug_message("expression_pedal - No params returned")143 return144 for param in params:145 self.change_effect_parameter(get_amp_effect_name(146 self.plugin.name), param[0], param[1])147 self.socketio.emit(dict_update_parameter, {148 dict_effect: self.plugin.name,149 dict_parameter: param[0],150 dict_value: param[1]151 })152 return153 if self.plugin.type == "onoff":154 self.log_debug_message("expression_pedal - plugin.type = onoff")155 isOn = self.plugin.calculate_state(value)156 if isOn:157 state = dict_On158 self.log_debug_message("expression_pedal - isOn = True")159 else:160 state = dict_Off161 self.log_debug_message("expression_pedal - isOn = False")162 self.socketio.emit(dict_update_onoff, {163 dict_state: state,164 dict_effect: self.plugin.name,165 dict_effect_type: self.plugin.effect_type166 })167 return168 self.log_debug_message("expression_pedal - plugin.type not found")169 def send_preset(self, chain_preset):170 self.log_debug_message("send_preset - " + chain_preset.name)171 chain_preset.preset = self.config.preset172 spark_preset = SparkPreset(chain_preset, type=dict_chain_preset)173 preset = self.msg.create_preset(spark_preset.json())174 for i in preset:175 self.bt_sock.send(i)176 change_user_preset = self.msg.change_hardware_preset(0x7f)177 self.bt_sock.send(change_user_preset[0])178 self.config.parse_chain_preset(chain_preset)179 def set_bpm(self, bpm):180 self.config.bpm = bpm181 spark_preset = SparkPreset(self.config, bpm=True)182 preset = self.msg.create_preset(spark_preset.json())183 for i in preset:184 self.bt_sock.send(i)185 change_user_preset = self.msg.change_hardware_preset(0x7f)186 self.bt_sock.send(change_user_preset[0])187 self.log_debug_message("set_bpm - value " + str(bpm))188 def store_amp_preset(self):189 spark_preset = SparkPreset(self.config)190 preset = self.msg.create_preset(spark_preset.json())191 for i in preset:192 self.bt_sock.send(i)193 change_user_preset = self.msg.change_hardware_preset(194 self.config.preset)195 self.bt_sock.send(change_user_preset[0])196 def toggle_effect_onoff(self, effect_type):197 effect = None198 effect_name = None199 # Ordered by priority for Pedal switching200 # Gate, comp and amp are optional hardware switches201 if effect_type == dict_drive:202 effect = self.config.drive203 elif effect_type == dict_mod:204 effect = self.config.modulation205 elif effect_type == dict_delay:206 effect = self.config.delay207 elif effect_type == dict_reverb:208 effect = self.config.reverb209 effect_name = self.config.reverb[dict_Name]210 elif effect_type == dict_gate:211 effect = self.config.gate212 elif effect_type == dict_comp:213 effect = self.config.comp214 elif effect_type == dict_amp:215 effect = self.config.amp216 if effect_name == None:217 get_js_effect_name(effect[dict_Name])218 state = dict_Off219 if effect[dict_OnOff] == dict_Off:220 state = dict_On221 self.turn_effect_onoff(222 get_amp_effect_name(effect[dict_Name]), state)223 self.config.update_config(effect[dict_Name], dict_turn_on_off, state)224 self.config.last_call = dict_turn_on_off225 self.log_debug_message(226 "toggle_effect_onoff - effect_type: " + effect_type + " state: " + state)227 return {dict_effect: effect_name,228 dict_state: state,229 dict_effect_type: effect_type}230 def turn_effect_onoff(self, effect, state):231 cmd = self.msg.turn_effect_onoff(effect, state)232 self.comms.send_it(cmd[0])233 self.log_debug_message(234 "turn_effect_onoff - effect: " + effect + " state: " + state)235 def request_preset(self, hw_preset):236 self.comms.send_preset_request(hw_preset)237 ##################238 # Utility Methods239 ##################240 def get_pedal_status(self):241 if self.config == None:242 return {}243 return {dict_drive: self.config.drive[dict_OnOff],244 dict_delay: self.config.delay[dict_OnOff],245 dict_mod: self.config.modulation[dict_OnOff],246 dict_reverb: self.config.reverb[dict_OnOff],247 dict_preset: self.config.preset,...
mxnet_inference.py
Source:mxnet_inference.py
...21import mxnet as mx22import logging as log23import datetime24from time import time25def log_debug_message(message):26 ts = time()27 ts_str = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')28 func = inspect.currentframe().f_back.f_code29 log.debug("[%s] %s: %s(..) in %s:%i" % (ts_str, message, func.co_name,30 func.co_filename, func.co_firstlineno))31 32def log_error_message(message):33 ts = time()34 ts_str = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')35 func = inspect.currentframe().f_back.f_code36 log.error("[%s] %s: %s(..) in %s:%i" % (ts_str, message, func.co_name,37 func.co_filename, func.co_firstlineno))38def log_info_message(message):39 ts = time()40 ts_str = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')41 func = inspect.currentframe().f_back.f_code42 log.info("[%s] %s: %s(..) in %s:%i" % (ts_str, message, func.co_name,43 func.co_filename, func.co_firstlineno))44def build_argparser():45 log_debug_message('START')46 47 parser = argparse.ArgumentParser()48 parser.add_argument('-g', '--graph', help = 'Path to the .json file \49 containing serialized computational graph', required = True,50 type = str)51 parser.add_argument('-m', '--model', help = 'Path to the .param file \52 with a trained weights', required = True, type = str)53 parser.add_argument('-d', '--data_shape', help = 'Input data shape \54 (for ResNet-50 it is \'3 224 224\')', required = True, type = str)55 parser.add_argument('-i', '--input_dir', help = 'Name of the directory \56 containing input images', required = True, type = str)57 parser.add_argument('-ni', '--number_iter', help = 'Number of inference \58 iterations', required = True, type = int)59 parser.add_argument('-o', '--need_output', help = 'Get output',60 required = True, type = lambda x: (str(x).lower() == 'true'))61 parser.add_argument('-of', '--output_file', help = 'Name of the output file',62 default = 'res.txt', type = str)63 parser.add_argument('-rf', '--perf_file', help = 'Name of the file \64 containing performance results', default = 'result.csv', type = str)65 parser.add_argument('-t', '--task_type', help = 'Task type: \66 classification / detection', default = 'classification', type = str)67 parser.add_argument('-me', '--mean', help = 'Input mean values', 68 default = '[0 0 0]', type = str)69 parser.add_argument('-b', '--batch_size', help = 'batch size', 70 default = 1, required = True, type = int)71 parser.add_argument('-l', '--label_file', help = 'List of labels (text file)',72 default = None, type = str)73 74 log_debug_message('FINISH')75 return parser76def load_network(graph, model):77 log_debug_message('START')78 79 re_json = r'([\w\d\-]+)\-symbol.json'80 re_params = r'([\w\d\-]+)\-([\d]+).params'81 cre_json = re.compile(re_json)82 cre_params = re.compile(re_params)83 m_json = cre_json.match(graph)84 m_params = cre_params.match(model)85 json_model_name = m_json.group(1)86 params_model_name = m_params.group(1)87 epoch_num = int(m_params.group(2))88 if json_model_name != params_model_name:89 error_message = 'Incorrect .json and .params file names. \90 The format of file name is as follows: prefix-symbol.json and \91 prefix-epoch.params'92 log_error_message(error_message)93 raise Exception(error_message)94 log_debug_message('START: mx.model.load_checkpoint(..)')95 sym, args, aux = mx.model.load_checkpoint(json_model_name, epoch_num)96 mx_module = mx.mod.Module(symbol = sym, data_names = ['data'],97 context = mx.cpu())98 log_debug_message('FINISH: mx.model.load_checkpoint(..)')99 log_debug_message('FINISH')100 return mx_module, sym, args, aux101def load_images(input_dir, batch_size, data_name, data_shape):102 log_debug_message('START')103 104 log_debug_message('START: load images')105 image_files = [ os.path.join(input_dir, f) for f in os.listdir(input_dir) \106 if os.path.isfile(os.path.join(input_dir, f))]107 images = np.empty((len(image_files), data_shape['nchannels'],108 data_shape['width'], data_shape['height']), dtype = 'float32')109 for idx in range(len(image_files)):110 img = mx.image.imread(image_files[idx])111 img = mx.image.imresize(img, data_shape['width'], data_shape['height'])112 img = img.transpose((2, 0, 1)) # channels first113 img = img.astype(dtype = 'float32')114 images[idx] = img.asnumpy()115 log_debug_message('FINISH: load images')116 log_debug_message('START: Ñreate iterator')117 eval_data = mx.io.NDArrayIter({ data_name: images },118 label = None, batch_size = batch_size, shuffle = False,119 last_batch_handle = 'discard')120 log_debug_message('FINISH: Ñreate iterator')121 log_debug_message('FINISH')122 return image_files, eval_data123 124def mxnet_benchmark(task_type, graph, model, input_dir, number_iter,125 data_shape, batch_size = 1, need_output = False, label_file = '',126 output_file = 'res.txt'):127 log_debug_message('START')128 129 mx_module, sym, args, aux = load_network(graph, model)130 image_files, eval_data = load_images(input_dir, batch_size,131 mx_module.data_names[0], data_shape)132 log_info_message('Network output names: {}'.format(sym.list_outputs()))133 134 log_debug_message('START: mx_module.bind(..)')135 mx_module.bind(data_shapes = eval_data.provide_data)136 log_debug_message('FINISH: mx_module.bind(..)')137 138 log_debug_message('START: mx_module.set_params(..)')139 mx_module.set_params(arg_params = args, aux_params = aux)140 log_debug_message('FINISH: mx_module.set_params(..)')141 142 log_debug_message('START: forward(..) for each batch')143 inference_times = []144 outputs = []145 data_iter = iter(eval_data)146 number_iter = (number_iter + batch_size - 1) // batch_size147 148 t0_total = time()149 for i in range(number_iter):150 data_batch = data_iter.next()151 t0 = time()152 mx_module.forward(data_batch, is_train = False)153 for output in mx_module.get_outputs():154 output.wait_to_read()155 t1 = time()156 157 outs = []158 for output in mx_module.get_outputs():159 outs.append(output.asnumpy())160 outputs.append(outs)161 162 inference_times.append(t1 - t0)163 t1_total = time()164 165 total_inference_time = t1_total - t0_total166 log_debug_message('FINISH: forward(..) for each batch')167 save_outputs(need_output, task_type, outputs, image_files,168 output_file, label_file)169 log_debug_message('FINISH')170 return inference_times, total_inference_time171def save_outputs(need_output, task_type, outputs, image_files,172 output_file, label_file):173 log_debug_message('START')174 175 if need_output == True:176 if task_type.lower() == 'classification':177 classification_output(outputs, image_files,178 output_file, label_file)179 else:180 if task_type.lower() == 'detection':181 detection_output(outputs, image_files, output_file, label_file)182 else:183 error_message = 'Incorrect task type \'{}\''.format(task_type)184 log_error_message(error_message)185 raise Exception(error_message)186 187 log_debug_message('FINISH')188def load_labels(label_file):189 log_debug_message('START')190 labels = []191 with open(label_file, 'r') as file:192 labels = [line.rstrip() for line in file]193 log_debug_message('FINISH')194 return labels195def classification_output(outputs, image_files, output_file, label_file):196 log_debug_message('START')197 labels = load_labels(label_file)198 199 file = open(output_file, 'w+')200 for batch_idx in range(len(outputs)):201 probabilities = outputs[batch_idx][0]202 203 batch_size = probabilities.shape[0]204 for image_idx in range(batch_size):205 top5 = np.argsort(probabilities[image_idx])[::-1]206 image = image_files[batch_idx * batch_size + image_idx]207 file.write('{0};{1};{2};{3};{4};{5}\n'.format(image,208 labels[top5[0]], labels[top5[1]],209 labels[top5[2]], labels[top5[3]],210 labels[top5[4]]))211 file.close()212 213 log_debug_message('FINISH')214def detection_output(outputs, image_files, output_file, label_file):215 log_debug_message('START')216 217 labels = load_labels(label_file)218 219 file = open(output_file, 'w+')220 for batch_idx in range(len(outputs)):221 batch_detections = outputs[batch_idx][0]222 223 batch_size = len(batch_detections)224 log_info_message('Batch identifier is {0}, batch size is {1}'.format(225 batch_idx, batch_size))226 for image_idx in range(batch_size):227 image_detections = batch_detections[image_idx]228 229 image_name = image_files[batch_idx * batch_size + image_idx]230 image = mx.image.imread(image_name)231 width = image.shape[0]232 height = image.shape[1]233 log_info_message(234 '\tImage name is \'{0}\', number of detections is {1}'.format(235 image_name, image_detections.shape[0]))236 for det_idx in range(len(image_detections)):237 (class_id, score, xl, yl, xr, yr) = image_detections[det_idx]238 if (class_id < 0) or (class_id >= len(labels)):239 # filter false detections (class_id = -1)240 continue241 xl = int(xl * width)242 yl = int(yl * height)243 xr = int(xr * width)244 yr = int(yr * height)245 log_info_message('\t\t({0}, {1}, {2}, {3}, {4}, {5})'.format(246 labels[int(class_id)], score, xl, yl, xr, yr))247 file.write('{0};{1};{2};{3};{4};{5};{6}\n'.format(image_name,248 labels[int(class_id)], score, xl, yl, xr, yr))249 250 file.close()251def three_sigma_rule(time):252 log_debug_message('START')253 254 average_time = np.mean(time)255 sigm = np.std(time)256 upper_bound = average_time + (3 * sigm)257 lower_bound = average_time - (3 * sigm)258 valid_time = []259 for i in range(len(time)):260 if lower_bound <= time[i] <= upper_bound:261 valid_time.append(time[i])262 263 log_debug_message('FINISH')264 return valid_time265def calculate_average_time(time):266 log_debug_message('START')267 268 average_time = np.mean(time)269 270 log_debug_message('FINISH')271 return average_time272def calculate_latency(time):273 log_debug_message('START')274 275 time.sort()276 latency = np.median(time)277 278 log_debug_message('FINISH')279 return latency280def calculate_fps(num_images, time):281 log_debug_message('START')282 283 fps = num_images / time284 285 log_debug_message('FINISH')286 return fps287def create_result_file(file_name):288 log_debug_message('START')289 290 if os.path.isfile(file_name):291 return292 file = open(file_name, 'w')293 head = 'Model;Batch size;Device;IterationCount;Latency;Total time (s);FPS;'294 file.write(head + '\n')295 file.close()296 297 log_debug_message('FINISH')298def write_row(file_name, net_name, batch_size, number_iter,299 latency, total_time, fps):300 log_debug_message('START')301 302 log_info_message('Total time: {0} s'.format(total_time))303 log_info_message('Latency: {0} s'.format(latency))304 log_info_message('FPS: {0}'.format(fps))305 306 row = '{};{};CPU;{};{:.3f};{:.3f};{:.3f}'.format(net_name, batch_size,307 number_iter, latency, total_time, fps)308 file = open(file_name, 'a')309 file.write(row + '\n')310 file.close()311 312 log_debug_message('FINISH')313def main(): 314 log.basicConfig(format = '[ %(levelname)s ] %(message)s',315 level = log.DEBUG, stream = sys.stdout)316 317 args = build_argparser().parse_args()318 319 inference_times, total_inference_time = mxnet_benchmark(320 task_type = args.task_type, graph = args.graph,321 model = args.model, input_dir = args.input_dir,322 number_iter = args.number_iter, batch_size = args.batch_size,323 data_shape = { 'nchannels': int(args.data_shape.split(' ')[0]),324 'width': int(args.data_shape.split(' ')[1]),325 'height': int(args.data_shape.split(' ')[2]) },326 need_output = args.need_output, label_file = args.label_file,...
proxy.py
Source:proxy.py
...31 while self.running:32 readable, _, _ = select.select(self.connections_queue, list(), list())33 self.check_ready_connections(readable)34 except KeyboardInterrupt:35 self.logger.log_debug_message('[*] Shutting down proxy')36 self.exit_proxy()37 except socket.error:38 return39 def check_ready_connections(self, readable):40 for sock in readable:41 if sock is self.listening_socket:42 conn, address = self.listening_socket.accept()43 self.logger.log_debug_message("[*] Received Connection")44 self.logger.log_connection(address, ' connect')45 self.connections_queue.append(conn)46 else:47 self.handle_connection(sock)48 #filter messages and pass them to queue49 def handle_connection(self, conn):50 try:51 msg, command, subcommand = self.get_request_from_client(conn)52 except struct.error:53 self.logger.log_error_message("*** Error, received invalid data from client ***")54 #conn.close() #OR RETURN?55 return56 except DisconnectException:57 self.logger.log_debug_message("***Client has closed connection***")58 self.remove_client(conn)59 return60 filter_data = (conn.getpeername(), command, subcommand)61 self.filter_message(conn, msg, filter_data)62 def get_request_from_client(self, conn):63 data_part_1 = self.get_data(conn, FRAME_SIZE)64 _, _, _, _, _, data_size = struct.unpack('<HBBHBH', data_part_1)65 data_part_2 = self.get_data(conn, 6) 66 _, command, subcommand = struct.unpack('<HHH', data_part_2)67 data_part_3 = self.get_data(conn, data_size-6)68 message = data_part_1 + data_part_2 + data_part_369 return message, command, subcommand70 def get_data(self, conn, size):71 data = ''72 while len(data) < size:73 packet = conn.recv(size-len(data))74 if not packet:75 raise DisconnectException76 data += packet 77 return data78 def filter_message(self, conn, msg, filter_data):79 address, command, subcommand = filter_data80 if not self.firewall.check_message(address, command, subcommand):81 self.remove_client(conn)82 return83 self.message_queue_condition.acquire()84 self.message_queue.append((conn, msg))85 if len(self.message_queue) == 1:86 self.message_queue_condition.notify()87 self.message_queue_condition.release()88 #THREAD C - send request to server and response to client89 def send_request_and_response(self):90 while self.running:91 conn, message = self.get_next_message_from_queue()92 try:93 response = self.communicate_with_server(message)94 self.send_response_to_client(conn, response)95 except CriticalDisconnectException:96 self.logger.log_error_message("*** Error, lost connection to server ***")97 self.exit_proxy()98 return99 except struct.error:100 self.logger.log_error_message("*** Error, received invalid data from server ***")101 continue102 def get_next_message_from_queue(self):103 self.message_queue_condition.acquire()104 if len(self.message_queue) == 0:105 self.message_queue_condition.wait()106 conn, message = self.message_queue.pop(0)107 self.message_queue_condition.release()108 return conn, message109 #return response from server110 def communicate_with_server(self, message):111 try:112 self.server_socket.sendall(message)113 except:114 raise CriticalDisconnectException115 response_part_1 = self.get_data_from_server(FRAME_SIZE)116 _, _, _, _, _, data_size = struct.unpack('<HBBHBH', response_part_1)117 response_part_2 = self.get_data_from_server(data_size) 118 return response_part_1 + response_part_2119 def get_data_from_server(self, size):120 data = ''121 while len(data) < size:122 packet = self.server_socket.recv(size-len(data))123 if not packet:124 raise CriticalDisconnectException125 data += packet 126 return data127 #send response back to client128 def send_response_to_client(self, conn, response):129 try:130 conn.sendall(response) 131 except:132 self.logger.log_debug_message('***Cant send response. Closed connection***')133 self.remove_client(conn)134 def remove_client(self, conn):135 self.connections_queue.remove(conn)136 #MAYBE REMOVE ALL MESSAGES FROM QUEUE137 conn.close()138 def start(self):139 self.set_up_sockets()140 thread = threading.Thread(target=self.send_request_and_response)141 thread.setDaemon(True)142 thread.start()143 self.listen_for_messages()144 def set_up_sockets(self):145 try:146 self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)147 except socket.error as e:148 self.logger.log_error_message('*** Error creating a server socket: %s ***' % e)149 self.failure_exit_proxy(e)150 self.logger.log_debug_message('[*] Server socket initialized...')151 try:152 server = (self.server_address, self.server_port)153 self.server_socket.connect(server)154 except socket.error as e:155 self.logger.log_error_message('*** Error connecting to server: %s ***' % e)156 self.failure_exit_proxy(e)157 self.logger.log_debug_message('[*] Connection to server estabilished...')158 try:159 self.listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)160 self.connections_queue.append(self.listening_socket)161 except socket.error as e:162 self.logger.log_error_message('*** Error creating a listening socket: %s ***' % e)163 self.failure_exit_proxy(e)164 self.logger.log_debug_message('[*] Listening socket initialized...')165 try:166 self.listening_socket.bind((self.host_address, self.host_port))167 self.listening_socket.listen(5000)168 except socket.error as e:169 self.logger.log_error_message('*** Error binding a listening socket: %s ***' % e)170 self.failure_exit_proxy(e)171 self.logger.log_debug_message('[*] Listening socket binded successfully...')172 self.logger.log_debug_message('[*] Proxy initialized successfully!')173 def exit_proxy(self):174 self.running = False175 if self.listening_socket:176 self.connections_queue.remove(self.listening_socket)177 self.listening_socket.close()178 if self.server_socket:179 self.server_socket.close()180 for conn in self.connections_queue:181 self.remove_client(conn)182 def failure_exit_proxy(self, e):183 self.exit_proxy()...
loot_master.py
Source:loot_master.py
...40 if not target.is_looted():41 if loot_chance == 0:42 self.sound_master.play_item_fx_sound('empty')43 damage_text.warning(target, f'Empty!', text_sprite)44 self.loot_master_logger.log_debug_message(f'Player Loots {target.__class__.__name__} '45 f'Receiving Nothing')4647 elif loot_chance == 1:48 self.sound_master.play_item_fx_sound('health_potion')49 caster.stash.add_healing_potion(1)50 damage_text.warning(target, f'x1 Health Potion Found!', text_sprite)51 self.loot_master_logger.log_debug_message(f'Player Loots {target.__class__.__name__} '52 f'Receiving Health Potion')5354 elif loot_chance == 2:55 self.sound_master.play_item_fx_sound('health_potion')56 caster.stash.add_mana_potion(1)57 damage_text.warning(target, f'x1 Mana Potion Found!', text_sprite)58 self.loot_master_logger.log_debug_message(f'Player Loots {target.__class__.__name__} '59 f'Receiving Mana Potion')6061 elif loot_chance == 3:62 quality_roll = randint(0, 50)63 if quality_roll > target.level:64 damage_text.warning(target, f'Food Found!', text_sprite)65 BREAD.consume(caster, text_sprite)66 self.sound_master.play_item_fx_sound('eat')67 self.loot_master_logger.log_debug_message(f'Player Loots {target.__class__.__name__} '68 f'Receiving Bread')69 else:70 damage_text.warning(target, f'Large Food Found!', text_sprite)71 LARGE_BREAD.consume(caster, text_sprite)72 self.sound_master.play_item_fx_sound('eat')73 self.loot_master_logger.log_debug_message(f'Player Loots {target.__class__.__name__} '74 f'Receiving Large Bread')7576 elif loot_chance == 4:77 quality_roll = randint(0, 50)78 if quality_roll > target.level:79 damage_text.warning(target, f'Drink Found!', text_sprite)80 DRINK.consume(caster, text_sprite)81 self.sound_master.play_item_fx_sound('drink')82 self.sound_master.play_item_fx_sound('eat')83 self.loot_master_logger.log_debug_message(f'Player Loots {target.__class__.__name__}'84 f' Receiving Drink')85 else:86 damage_text.warning(target, f'Large Drink Found!', text_sprite)87 LARGE_DRINK.consume(caster, text_sprite)88 self.sound_master.play_item_fx_sound('drink')89 self.loot_master_logger.log_debug_message(f'Player Loots {target.__class__.__name__}'90 f' Receiving Large Drink')9192 elif loot_chance == 5:93 gold = randint(1, 9) + target.level94 caster.stash.add_gold(gold)95 damage_text.cast(target, f'{gold} Gold Found!', text_sprite)96 self.sound_master.play_item_fx_sound('gold')97 self.loot_master_logger.log_debug_message(f'Player Loots {target.__class__.__name__}'98 f' Receiving {gold} Gold')99100 target.update_looted_status()101 else:102 self.loot_error(target, text_sprite)103104 def roll_boss_loot(self, caster, target, text_sprite):105 item_generator = EquipmentGenerator()106 if not target.is_looted():107 self.sound_master.play_item_fx_sound('drum_roll')108 # Todo: Proper setup for loot boss, based on level109 roll_item = item_generator.get_item(30, 1000)110 item_name = roll_item.get_item_name()111 damage_text.heal(target, f'{item_name}', text_sprite)112 self.loot_master_logger.log_debug_message(f'Player Loots {target.__class__.__name__}'113 f' Receiving {item_name.title()}')114 target.update_looted_status()115116 # Note: Temporary Approach117 caster.backpack.add_item(roll_item)118119 else:120 self.loot_error(target, text_sprite)121 caster.re_calculate_hero_stats()122123 def loot_error(self, target, text_sprite):124 # Todo: Init with -30y125 damage_text.warning(target, f'ALREADY LOOTED!', text_sprite)126 self.sound_master.play_item_fx_sound('error')
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!