Best Python code snippet using fMBT_python
SensorTagDetector.py
Source:SensorTagDetector.py
1from bluepy import sensortag, btle2import paho.mqtt.client as mqtt3import threading4import time5import json6import redis7from threading import Timer, Thread, Event8import sqlite39import configparser10config = configparser.ConfigParser()11config.read('conf.ini')12retrieving_rate = float(config['general']['retrieving-rate'])13def main():14 print("DB url", config['SQLiteDB']['url'])15 interval = 18016 stopFlag = Event()17 timerTask = TimerTask(stopFlag, interval)18 timerTask.start()19class TimerTask(Thread):20 def __init__(self, event, duration):21 Thread.__init__(self)22 self.stopped = event23 self.duration = duration24 def run(self):25 while True:26 try:27 device_list = []28 current_device_id = 029 # scanner = btle.Scanner()30 # device_info_list = scanner.scan(15)31 # print(device_info_list)32 _redis = redis.StrictRedis(host="localhost", port=6379, db=0)33 print("Try to detect sensors each %s seconds" % self.duration)34 # search configured devices in db35 conn = sqlite3.connect(config['SQLiteDB']['url'])36 c = conn.cursor()37 c.execute('SELECT DISTINCT mac FROM sensor')38 for row in c:39 device_mac = row[0]40 if len(device_list) == 0:41 data_thread = RetrievingDataThread(current_device_id, "Thread - " + str(current_device_id),42 current_device_id, device_mac)43 current_device_id += 144 device_list.append(device_mac)45 data_thread.start()46 else:47 not_existed = True48 for tmp_run in device_list:49 if device_mac == tmp_run:50 not_existed = False51 if not_existed:52 device_list.append(device_mac)53 data_thread = RetrievingDataThread(current_device_id, "Thread - " + str(current_device_id),54 current_device_id, device_mac)55 current_device_id += 156 data_thread.start()57 c.close()58 for tmp in device_list:59 _redis.sadd("device_list", tmp)60 except Exception as e:61 print("Exception in scanning")62 print(e)63 if self.stopped.wait(self.duration):64 break65class RetrievingDataThread(threading.Thread):66 _device_mac = None67 _exit_flag = False68 _sensor = None69 _retry_count = 070 _redis = None71 def __init__(self, threadID, name, counter, device_mac):72 threading.Thread.__init__(self)73 self.threadID = threadID74 self.name = name75 self.counter = counter76 self._device_mac = device_mac77 self._redis = redis.StrictRedis(host="localhost", port=6379, db=0)78 def run(self):79 print(">>>>>...")80 print(self._device_mac)81 self._reconnect()82 while not self._exit_flag and self._retry_count < 5:83 try:84 if self._sensor.getState() == "conn":85 # Get the timestamp86 # current_time = time.asctime(time.localtime(time.time()))87 current_time = time.time()88 data_dict = {}89 data_dict["current_time"] = current_time90 data_dict["MAC"] = self._device_mac91 data_dict["temperature"] = self._sensor.IRtemperature.read()92 data_dict["humidity"] = self._sensor.humidity.read()93 data_dict["barometer"] = self._sensor.barometer.read()94 data_dict["accelerometer"] = self._sensor.accelerometer.read()95 data_dict["magnetometer"] = self._sensor.magnetometer.read()96 data_dict["gyroscope"] = self._sensor.gyroscope.read()97 data_dict["light"] = self._sensor.lightmeter.read()98 data_dict["battery"] = self._sensor.battery.read()99 # self.push_to_redis(data_dict)100 self.push_to_mqtt(data_dict)101 else:102 self._retry_count += 1103 if self._retry_count < 5:104 self._reconnect()105 except Exception as e:106 print("Exception, wait and try to reconnect")107 print(e)108 self._retry_count += 1109 if self._retry_count < 5:110 self._reconnect()111 time.sleep(retrieving_rate)112 def set_exit_flag(self, exit_flag):113 self._exit_flag = exit_flag114 def _reconnect(self):115 try:116 if self._sensor is not None:117 self._sensor.disconnect()118 self._sensor = None119 if self._sensor is None and self._device_mac is not None:120 print("Try to connect SensorTag...")121 self._sensor = sensortag.SensorTag(self._device_mac.upper())122 # Enabling selected sensors123 self._sensor.IRtemperature.enable()124 self._sensor.humidity.enable()125 self._sensor.barometer.enable()126 self._sensor.accelerometer.enable()127 self._sensor.magnetometer.enable()128 self._sensor.gyroscope.enable()129 self._sensor.keypress.enable()130 self._sensor.setDelegate(sensortag.KeypressDelegate())131 self._sensor.lightmeter.enable()132 self._sensor.battery.enable()133 time.sleep(1.0)134 self._retry_count = 0135 except Exception as e:136 print(e)137 def push_to_redis(self, data_dict):138 json_str = json.dumps(data_dict)139 length = self._redis.lpush(self._device_mac, json_str)140 if length > 100:141 self._redis.rpop(self._device_mac)142 def push_to_mqtt(self, data_dict):143 json_str = json.dumps(self.generate_senml_messages(data_dict))144 # print("SENML ", json_str)145 # device_id = "bab0170f-5d34-48e0-8b10-22c40f150377"146 # device_key = "29200cc0-c191-428e-ae06-cec233fef9a1"147 # channel_id = "74b92194-c101-472b-b033-e495cfddcb79"148 # get broker address149 broker_address = config['MQTT']['broker']150 # get device id, key and channel from SQLite151 conn = sqlite3.connect(config['SQLiteDB']['url'])152 c = conn.cursor()153 t = (data_dict['MAC'],)154 c.execute('SELECT * FROM sensor WHERE mac=?', t)155 for row in c:156 mqtt_client = mqtt.Client()157 mqtt_client.username_pw_set(row[1], row[2])158 mqtt_client.connect(broker_address)159 mqtt_topic = "channels/" + row[3] + "/messages"160 mqtt_client.publish(topic=mqtt_topic, payload=json_str)161 mqtt_client.disconnect()162 c.close()163 def generate_senml_messages(self, data_dict):164 senml_message = []165 if data_dict is not None:166 # Insert battery value167 base = {}168 base['bn'] = "urn:dev:mac:" + data_dict['MAC'] + ":"169 base['bt'] = data_dict['current_time']170 base['n'] = 'base'171 base['u'] = ''172 base['t'] = 0173 base['v'] = 0174 senml_message.append(base)175 if 'battery' in config[data_dict['MAC']]:176 battery_streams = config[data_dict['MAC']]['battery']177 tmp_parts = battery_streams.split(",")178 for part in tmp_parts:179 battery = {}180 # battery['bn'] = "urn:dev:mac:" + data_dict['MAC'] + ":"181 # battery['bt'] = data_dict['current_time']182 battery['n'] = 'battery' + "-" + part183 battery['u'] = '%EL'184 battery['t'] = 0185 battery['v'] = data_dict['battery']186 senml_message.append(battery)187 # battery = {}188 # battery['bn'] = "urn:dev:mac:" + data_dict['MAC'] + ":"189 # battery['bt'] = data_dict['current_time']190 # battery['n'] = 'battery'191 # battery['u'] = '%EL'192 # battery['t'] = 0193 # battery['v'] = data_dict['battery']194 # senml_message.append(battery)195 if 'ambient_temp' in config[data_dict['MAC']]:196 temp_streams = config[data_dict['MAC']]['ambient_temp']197 tmp_parts = temp_streams.split(",")198 for part in tmp_parts:199 ambient_tmp = {}200 ambient_tmp['n'] = 'ambient_temp' + "-" + part201 ambient_tmp['u'] = 'Cel'202 ambient_tmp['t'] = 0203 ambient_tmp['v'] = data_dict['barometer'][0]204 senml_message.append(ambient_tmp)205 # ambient_tmp = {}206 # ambient_tmp['n'] = 'ambient_temp'207 # ambient_tmp['u'] = 'Cel'208 # ambient_tmp['t'] = 0209 # ambient_tmp['v'] = data_dict['barometer'][0]210 # senml_message.append(ambient_tmp)211 if 'humidity' in config[data_dict['MAC']]:212 humidity_streams = config[data_dict['MAC']]['humidity']213 tmp_parts = humidity_streams.split(",")214 for part in tmp_parts:215 humidity = {}216 humidity['n'] = 'humidity' + "-" + part217 humidity['u'] = '%RH'218 humidity['t'] = 0219 humidity['v'] = data_dict['humidity'][1]220 senml_message.append(humidity)221 # humidity = {}222 # humidity['n'] = 'humidity'223 # humidity['u'] = '%RH'224 # humidity['t'] = 0225 # humidity['v'] = data_dict['humidity'][1]226 # senml_message.append(humidity)227 # Get configured data streams based on MAC228 if 'light' in config[data_dict['MAC']]:229 light_streams = config[data_dict['MAC']]['light']230 tmp_parts = light_streams.split(",")231 for part in tmp_parts:232 light = {}233 light['n'] = 'light' + "-" + part234 light['u'] = 'lux'235 light['t'] = 0236 light['v'] = data_dict['light']237 senml_message.append(light)238 # light = {}239 # light['n'] = 'light' + ""240 # light['u'] = 'lux'241 # light['t'] = 0242 # light['v'] = data_dict['light']243 # senml_message.append(light)244 if 'pressure' in config[data_dict['MAC']]:245 pressure_streams = config[data_dict['MAC']]['pressure']246 tmp_parts = pressure_streams.split(",")247 for part in tmp_parts:248 pressure = {}249 pressure['n'] = 'pressure' + "-" + part250 pressure['u'] = 'millibar'251 pressure['t'] = 0252 pressure['v'] = data_dict['barometer'][1]253 senml_message.append(pressure)254 # pressure = {}255 # pressure['n'] = 'pressure'256 # pressure['u'] = 'millibar'257 # pressure['t'] = 0258 # pressure['v'] = data_dict['barometer'][1]259 # senml_message.append(pressure)260 if 'accelerometer_x' in config[data_dict['MAC']]:261 accx_streams = config[data_dict['MAC']]['accelerometer_x']262 tmp_parts = accx_streams.split(",")263 for part in tmp_parts:264 accelerometer_x = {}265 accelerometer_x['n'] = 'accelerometer_x' + "-" + part266 accelerometer_x['u'] = 'G'267 accelerometer_x['t'] = 0268 accelerometer_x['v'] = data_dict['accelerometer'][0]269 senml_message.append(accelerometer_x)270 # accelerometer_x = {}271 # accelerometer_x['n'] = 'accelerometer_x'272 # accelerometer_x['u'] = 'G'273 # accelerometer_x['t'] = 0274 # accelerometer_x['v'] = data_dict['accelerometer'][0]275 # senml_message.append(accelerometer_x)276 if 'accelerometer_y' in config[data_dict['MAC']]:277 accy_streams = config[data_dict['MAC']]['accelerometer_y']278 tmp_parts = accy_streams.split(",")279 for part in tmp_parts:280 accelerometer_y = {}281 accelerometer_y['n'] = 'accelerometer_y' + "-" + part282 accelerometer_y['u'] = 'G'283 accelerometer_y['t'] = 0284 accelerometer_y['v'] = data_dict['accelerometer'][1]285 senml_message.append(accelerometer_y)286 # accelerometer_y = {}287 # accelerometer_y['n'] = 'accelerometer_y'288 # accelerometer_y['u'] = 'G'289 # accelerometer_y['t'] = 0290 # accelerometer_y['v'] = data_dict['accelerometer'][1]291 # senml_message.append(accelerometer_y)292 if 'accelerometer_z' in config[data_dict['MAC']]:293 accz_streams = config[data_dict['MAC']]['accelerometer_z']294 tmp_parts = accz_streams.split(",")295 for part in tmp_parts:296 accelerometer_z = {}297 accelerometer_z['n'] = 'accelerometer_z' + "-" + part298 accelerometer_z['u'] = 'G'299 accelerometer_z['t'] = 0300 accelerometer_z['v'] = data_dict['accelerometer'][2]301 senml_message.append(accelerometer_z)302 # accelerometer_z = {}303 # accelerometer_z['n'] = 'accelerometer_z'304 # accelerometer_z['u'] = 'G'305 # accelerometer_z['t'] = 0306 # accelerometer_z['v'] = data_dict['accelerometer'][2]307 # senml_message.append(accelerometer_z)308 if 'magnetometer_x' in config[data_dict['MAC']]:309 magnetx_streams = config[data_dict['MAC']]['magnetometer_x']310 tmp_parts = magnetx_streams.split(",")311 for part in tmp_parts:312 magnetometer_x = {}313 magnetometer_x['n'] = 'magnetometer_x' + "-" + part314 magnetometer_x['u'] = 'uT'315 magnetometer_x['t'] = 0316 magnetometer_x['v'] = data_dict['magnetometer'][0]317 senml_message.append(magnetometer_x)318 # magnetometer_x = {}319 # magnetometer_x['n'] = 'magnetometer_x'320 # magnetometer_x['u'] = 'uT'321 # magnetometer_x['t'] = 0322 # magnetometer_x['v'] = data_dict['magnetometer'][0]323 # senml_message.append(magnetometer_x)324 if 'magnetometer_y' in config[data_dict['MAC']]:325 magnety_streams = config[data_dict['MAC']]['magnetometer_y']326 tmp_parts = magnety_streams.split(",")327 for part in tmp_parts:328 magnetometer_y = {}329 magnetometer_y['n'] = 'magnetometer_y' + "-" + part330 magnetometer_y['u'] = 'uT'331 magnetometer_y['t'] = 0332 magnetometer_y['v'] = data_dict['magnetometer'][1]333 senml_message.append(magnetometer_y)334 # magnetometer_y = {}335 # magnetometer_y['n'] = 'magnetometer_y'336 # magnetometer_y['u'] = 'uT'337 # magnetometer_y['t'] = 0338 # magnetometer_y['v'] = data_dict['magnetometer'][1]339 # senml_message.append(magnetometer_y)340 if 'magnetometer_z' in config[data_dict['MAC']]:341 magnetz_streams = config[data_dict['MAC']]['magnetometer_z']342 tmp_parts = magnetz_streams.split(",")343 for part in tmp_parts:344 magnetometer_z = {}345 magnetometer_z['n'] = 'magnetometer_z' + "-" + part346 magnetometer_z['u'] = 'uT'347 magnetometer_z['t'] = 0348 magnetometer_z['v'] = data_dict['magnetometer'][2]349 senml_message.append(magnetometer_z)350 # magnetometer_z = {}351 # magnetometer_z['n'] = 'magnetometer_z'352 # magnetometer_z['u'] = 'uT'353 # magnetometer_z['t'] = 0354 # magnetometer_z['v'] = data_dict['magnetometer'][2]355 # senml_message.append(magnetometer_z)356 if 'gyroscope_x' in config[data_dict['MAC']]:357 gyrox_streams = config[data_dict['MAC']]['gyroscope_x']358 tmp_parts = gyrox_streams.split(",")359 for part in tmp_parts:360 gyroscope_x = {}361 gyroscope_x['n'] = 'gyroscope_x' + "-" + part362 gyroscope_x['u'] = 'deg/sec'363 gyroscope_x['t'] = 0364 gyroscope_x['v'] = data_dict['gyroscope'][0]365 senml_message.append(gyroscope_x)366 # gyroscope_x = {}367 # gyroscope_x['n'] = 'gyroscope_x'368 # gyroscope_x['u'] = 'deg/sec'369 # gyroscope_x['t'] = 0370 # gyroscope_x['v'] = data_dict['gyroscope'][0]371 # senml_message.append(gyroscope_x)372 if 'gyroscope_y' in config[data_dict['MAC']]:373 gyroy_streams = config[data_dict['MAC']]['gyroscope_y']374 tmp_parts = gyroy_streams.split(",")375 for part in tmp_parts:376 gyroscope_y = {}377 gyroscope_y['n'] = 'gyroscope_y' + "-" + part378 gyroscope_y['u'] = 'deg/sec'379 gyroscope_y['t'] = 0380 gyroscope_y['v'] = data_dict['gyroscope'][1]381 senml_message.append(gyroscope_y)382 # gyroscope_y = {}383 # gyroscope_y['n'] = 'gyroscope_y'384 # gyroscope_y['u'] = 'deg/sec'385 # gyroscope_y['t'] = 0386 # gyroscope_y['v'] = data_dict['gyroscope'][1]387 # senml_message.append(gyroscope_y)388 if 'gyroscope_z' in config[data_dict['MAC']]:389 gyroz_streams = config[data_dict['MAC']]['gyroscope_z']390 tmp_parts = gyroz_streams.split(",")391 for part in tmp_parts:392 gyroscope_z = {}393 gyroscope_z['n'] = 'gyroscope_z' + "-" + part394 gyroscope_z['u'] = 'deg/sec'395 gyroscope_z['t'] = 0396 gyroscope_z['v'] = data_dict['gyroscope'][2]397 senml_message.append(gyroscope_z)398 # gyroscope_z = {}399 # gyroscope_z['n'] = 'gyroscope_z'400 # gyroscope_z['u'] = 'deg/sec'401 # gyroscope_z['t'] = 0402 # gyroscope_z['v'] = data_dict['gyroscope'][2]403 # senml_message.append(gyroscope_z)404 return senml_message405if __name__ == "__main__":...
attitude_kalman_filter.py
Source:attitude_kalman_filter.py
1# -*- coding: utf-8 -*-2"""3Created on Wed Jan 26 15:39:21 202245@author: Thomas Maynadié6"""7import numpy as np8import matplotlib.pyplot as plt910import prefilters.imu_prefilters as imu_prefilters11import ekf.navigation_ekf as navigation_ekf1213import helpers.simulation.rotation_simulation as rotation_simulation1415from helpers.quaternion_helper import quaternion161718def main_routine():19 # initialization20 # get sensor parameters from the first 30 still measurements21 22 # --------------------------------------------23 # sensor definition24 # --------------------------------------------25 26 # 1st MARG sensor27 # gyroscope28 gyroscope_std_deviation_1 = np.array([0.1, 0.12, 0.098])29 gyroscope_bias_1 = np.array([0.2, 0.15, 0.12])30 31 # accelerometer32 accelerometer_std_deviation_1 = np.array([0.072, 0.078, 0.069])33 accelerometer_bias_1 = np.array([0.03, 0.01, 0.02])34 35 # magnetometer36 magnetometer_std_deviation_1 = np.array([0.2, 0.21, 0.19])37 magnetometer_bias_1 = np.array([0.11, 0.15, 0.12])38 39 # 2nd MARG sensor40 # gyroscope41 gyroscope_std_deviation_2 = np.array([0.1, 0.12, 0.098])42 gyroscope_bias_2 = np.array([0.2, 0.15, 0.12])43 44 # accelerometer45 accelerometer_std_deviation_2 = np.array([0.072, 0.078, 0.069])46 accelerometer_bias_2 = np.array([0.03, 0.01, 0.02])47 48 # magnetometer49 magnetometer_std_deviation_2 = np.array([0.2, 0.21, 0.19])50 magnetometer_bias_2 = np.array([0.11, 0.15, 0.12])51 52 # --------------------------------------------53 # Component defintion54 # --------------------------------------------55 56 # Gyroscope pre-filter57 gyroscope_prefilter = imu_prefilters.gyroscope_preprocessor(gyroscope_std_deviation_1, gyroscope_bias_1, gyroscope_std_deviation_2, gyroscope_bias_2)58 59 # Magnetometer + Accelerometer preprocessing unit60 accelerometer_prepreprocessor = imu_prefilters.imu_sensor_preprocessor(accelerometer_std_deviation_1, accelerometer_bias_1, accelerometer_std_deviation_2, accelerometer_bias_2)61 magnetometer_prepreprocessor = imu_prefilters.imu_sensor_preprocessor(magnetometer_std_deviation_1, magnetometer_bias_1, magnetometer_std_deviation_2, magnetometer_bias_2)62 63 accelerometer_processed_std_deviation = accelerometer_prepreprocessor.get_std_deviation()64 accelerometer_processed_bias = accelerometer_prepreprocessor.get_bias()65 66 gyroscope_processed_std_deviation = gyroscope_prefilter.get_std_deviation()67 gyroscope_processed_bias = gyroscope_prefilter.get_bias()68 69 magnetometer_processed_std_deviation = magnetometer_prepreprocessor.get_std_deviation()70 magnetometer_processed_bias = magnetometer_prepreprocessor.get_bias()71 72 # GPS preprocessor73 gps_pos_std_deviation = np.zeros(3)74 gps_vel_std_deviation = np.zeros(3)75 # TODO76 77 # --------------------------------------------78 # Constants79 # --------------------------------------------80 81 # All global quantities are given in the ENU reference frame82 global_gravity_vector = np.array([0, 0, -1]) 83 global_magnetic_vector = np.array([0, 1, 0])84 85 # --------------------------------------------86 # Rocket geometry87 # --------------------------------------------88 89 # All local quantities are given in the Rocket Frame (RF) reference frame (x : main rocket axis / y, z : arbitrarly chosen with respect to the navigation module's geometry)90 91 # Navigation module position92 r_rocket_cog_to_nav_ref = np.array([0.55, 0, 0]) # tbd with structural team93 94 # MARGs position95 r_nav_ref_to_marg1 = np.array([0.1, -1, 0]) # tbd with structural team96 r_nav_ref_to_marg2 = np.array([0.1, -1, 0]) # tbd with structural team97 98 # --------------------------------------------99 # Other variables100 # --------------------------------------------101 iteration_nb = 1000102 103 t0 = 0104 tf = 100105 dt = (tf - t0)/iteration_nb106 107 # simulation108 initial_attitude = np.array([0, -90, 0])109 initial_position = np.array([0, 0, 0])110 111 angular_rate = np.array([2, 0.0, 0.0])112 simulation = rotation_simulation.rotation_sim_quaternions(initial_attitude, angular_rate, tf, iteration_nb)113 114 # results115 time_array, theoretical_trajectory = simulation.get_trajectory()116 estimated_trajectory = [[*quaternion(*initial_attitude).get_coefficients(), 0, 0, 0]]117 time_array = np.linspace(t0, tf, iteration_nb)118 119 w_m = [angular_rate]120 121 # --------------------------------------------122 # Kalman Variables123 # --------------------------------------------124 navigation_state_estimator = navigation_ekf.NavigationEKF(initial_attitude, initial_position, gyroscope_processed_std_deviation, accelerometer_processed_std_deviation, accelerometer_processed_bias, magnetometer_processed_std_deviation, magnetometer_processed_bias, gps_pos_std_deviation, gps_vel_std_deviation)125126 # --------------------------------------------127 # MAIN LOOP128 # --------------------------------------------129 for i in range(1, iteration_nb):130 # get measurement 131 raw_accelerometer_measurement_1, raw_accelerometer_measurement_2, raw_gyroscope_measurement_1, raw_gyroscope_measurement_2, raw_magnetometer_measurement_1, raw_magnetometer_measurement_2 = simulation.measurement(accelerometer_processed_std_deviation, accelerometer_processed_bias, gyroscope_processed_std_deviation, gyroscope_processed_bias, magnetometer_processed_std_deviation, i)132 133 # preprocess measurements134 processed_accelerometer_measurement = accelerometer_prepreprocessor.process_measurements(raw_accelerometer_measurement_1, raw_accelerometer_measurement_2)135 processed_gyroscope_measurement, estimated_angular_acceleration = gyroscope_prefilter.process_measurements(raw_gyroscope_measurement_1, raw_gyroscope_measurement_2, dt)136 processed_magnetometer_measurement = magnetometer_prepreprocessor.process_measurements(raw_magnetometer_measurement_1, raw_magnetometer_measurement_2)137 138 w_m.append(processed_gyroscope_measurement)139 140 # Kalman step141 measurement_vector = np.array([*processed_magnetometer_measurement, *processed_accelerometer_measurement])142 input_vector = np.array([*processed_gyroscope_measurement])143 144 estimated_trajectory.append(navigation_state_estimator.filter_step(input_vector, measurement_vector, dt))145 146 # --------------------------------------------147 # POST PROCESSING148 # --------------------------------------------149 estimated_trajectory = np.array(estimated_trajectory).transpose()150 theoretical_trajectory = np.array(theoretical_trajectory).transpose()151 w_m = np.array(w_m).transpose()152 153 fig = plt.figure(dpi=300)154 subplots = fig.subplots(7, 1)155 156 y_labels = ["q0", "q1", "q2", "q3", "b_wx", "b_wy", "b_wz"]157 158 for i in range(4):159 subplots[i].plot(time_array, theoretical_trajectory[i], "r")160 subplots[i].plot(time_array, estimated_trajectory[i], "b")161 162 subplots[i].set_xlabel("t [s]")163 subplots[i].set_ylabel(y_labels[i])164 subplots[i].grid()165 166 for i in range(4,7):167 subplots[i].plot(time_array, np.ones(iteration_nb)*gyroscope_processed_bias[i-4], "r")168 subplots[i].plot(time_array, estimated_trajectory[i], "b")169 170 subplots[i].set_xlabel("t [s]")171 subplots[i].set_ylabel(y_labels[i])172 subplots[i].grid()173174 print("done")175 176if __name__ == '__main__':
...
accelerometerclass.py
Source:accelerometerclass.py
1import math2import time3try:4 import RPi.GPIO as GPIO5except:6 print("Could not find RPi.GPIO. Try installing the RPi module")7import busio8import adafruit_adxl34x9NUM_BITS = 810MAX_MAGNITUDE = 2.0 # Maximum number representable and starting position for creating output.11# The numeric representation for the most significant bit would be MAX_MAGNITUDE/2,12# The next most significant bit would be MAX_MAGNITUDE/4,13# The next would be MAX_MAGNITUDE/8 and so on.14def main():15 import board # Weird import location due to startup error on coding computer16 i2c = busio.I2C(board.SCL, board.SDA)17 accelerometer = adafruit_adxl34x.ADXL345(i2c)18 last_accel_magnitude = 019 print("Outputting accelerometer data...")20 AccelerometerClass.setup_gpio()21 try:22 while True:23 start_time_ns = time.time()24 for i in range(10000):25 x, y, z = accelerometer.acceleration26 accel_magnitude = math.sqrt(x ** 2 + y ** 2 + z ** 2)27 # TODO: Maybe add compensation for delta time28 jerk = abs(accel_magnitude - last_accel_magnitude)29 bin_array = AccelerometerClass.floating_point_to_fixed_point_bin_array(jerk)30 AccelerometerClass.output_gpio_data(bin_array)31 last_accel_magnitude = accel_magnitude32 end_time_ns = time.time()33 loop_time = (start_time_ns - end_time_ns) / 1000034 print("The average time to output is " + str(loop_time) + " seconds")35 except:36 AccelerometerClass.cleanup_pins()37class AccelerometerClass:38 PINS_BOARD = [11, 13, 15, 29, 31, 33, 35, 37]39 PINS_BCM = [0, 2, 3, 21, 22, 23, 24, 25]40 PIN_NUMBERS = [0, 2, 3, 21, 22, 23, 24, 25]41 ACK_BOARD = 3642 ACK_BCM = 2743 ACK = ACK_BCM44 SEND_READY_BOARD = 3845 SEND_READY_BCM = 2846 SEND_READY = SEND_READY_BCM47 OFFPIN_IN_BOARD = 4048 OFFPIN_IN_BCM = 2949 OFFPIN_IN = OFFPIN_IN_BCM50 @staticmethod51 def floating_point_to_fixed_point_bin_array(float_in):52 """53 :param float_in: Any floating point number less than MAX_MAGNITUDE54 :return: Binary array representing a fixed point integer with NUM_DECIMAL_PLACES number of binary decimal places.55 """56 if float_in > MAX_MAGNITUDE:57 return [True, True, True, True, True, True, True, True]58 bin_array = [False, False, False, False, False, False, False, False]59 comparison_num = MAX_MAGNITUDE / 260 running_float = float_in61 for i in range(0, NUM_BITS):62 if running_float >= comparison_num:63 bin_array[i] = True64 running_float -= comparison_num65 comparison_num /= 266 return bin_array67 @staticmethod68 def gpio_pinnout_check():69 AccelerometerClass.setup_gpio()70 print("The pins should light up in sequential order. (LSB to MSB)")71 print("The SEND_READY pin will stay high.")72 print("Press ctrl-C to continue")73 GPIO.output(AccelerometerClass.SEND_READY, GPIO.HIGH)74 try:75 while True:76 for pin_number in AccelerometerClass.PIN_NUMBERS:77 GPIO.output(pin_number, GPIO.HIGH)78 time.sleep(1.0) # Wait one second before setting high the next output79 GPIO.output(pin_number, GPIO.LOW)80 finally:81 AccelerometerClass.cleanup_pins()82 @staticmethod83 def cleanup_pins():84 GPIO.setup(AccelerometerClass.PIN_NUMBERS[0], GPIO.OUT)85 GPIO.setup(AccelerometerClass.PIN_NUMBERS[1], GPIO.OUT)86 GPIO.setup(AccelerometerClass.PIN_NUMBERS[2], GPIO.OUT)87 GPIO.setup(AccelerometerClass.PIN_NUMBERS[3], GPIO.OUT)88 GPIO.setup(AccelerometerClass.PIN_NUMBERS[4], GPIO.OUT)89 GPIO.setup(AccelerometerClass.PIN_NUMBERS[5], GPIO.OUT)90 GPIO.setup(AccelerometerClass.PIN_NUMBERS[6], GPIO.OUT)91 GPIO.setup(AccelerometerClass.PIN_NUMBERS[7], GPIO.OUT)92 GPIO.setup(AccelerometerClass.SEND_READY, GPIO.OUT)93 GPIO.cleanup()94 @staticmethod95 def output_gpio_data(bin_array):96 """97 This function assumes bin_array is NUM_BITS long.98 :param bin_array: A NUM_BITS long array of bits99 :return:100 """101 for i in range(NUM_BITS):102 GPIO.output(AccelerometerClass.PIN_NUMBERS[i], bin_array[i])103 @staticmethod104 def setup_gpio():105 # GPIO.setwarnings(False)106 if GPIO.getmode() == GPIO.BCM:107 AccelerometerClass.PIN_NUMBERS = AccelerometerClass.PINS_BCM108 AccelerometerClass.ACK = AccelerometerClass.ACK_BCM109 AccelerometerClass.SEND_READY = AccelerometerClass.SEND_READY_BCM110 AccelerometerClass.OFFPIN_IN = AccelerometerClass.OFFPIN_IN_BCM111 elif GPIO.getmode() == GPIO.BOARD:112 AccelerometerClass.PIN_NUMBERS = AccelerometerClass.PINS_BOARD113 AccelerometerClass.ACK = AccelerometerClass.ACK_BOARD114 AccelerometerClass.SEND_READY = AccelerometerClass.SEND_READY_BOARD115 AccelerometerClass.OFFPIN_IN = AccelerometerClass.OFFPIN_IN_BOARD116 else:117 GPIO.setmode(GPIO.BOARD)118 AccelerometerClass.PIN_NUMBERS = AccelerometerClass.PINS_BOARD119 AccelerometerClass.ACK = AccelerometerClass.ACK_BOARD120 AccelerometerClass.SEND_READY = AccelerometerClass.SEND_READY_BOARD121 AccelerometerClass.OFFPIN_IN = AccelerometerClass.OFFPIN_IN_BOARD122 # Setup output GPIO pins123 GPIO.setup(AccelerometerClass.PIN_NUMBERS[0], GPIO.OUT)124 GPIO.setup(AccelerometerClass.PIN_NUMBERS[1], GPIO.OUT)125 GPIO.setup(AccelerometerClass.PIN_NUMBERS[2], GPIO.OUT)126 GPIO.setup(AccelerometerClass.PIN_NUMBERS[3], GPIO.OUT)127 GPIO.setup(AccelerometerClass.PIN_NUMBERS[4], GPIO.OUT)128 GPIO.setup(AccelerometerClass.PIN_NUMBERS[5], GPIO.OUT)129 GPIO.setup(AccelerometerClass.PIN_NUMBERS[6], GPIO.OUT)130 GPIO.setup(AccelerometerClass.PIN_NUMBERS[7], GPIO.OUT)131 GPIO.setup(AccelerometerClass.SEND_READY, GPIO.OUT)132 # Set all GPIO pins low to start133 GPIO.output(AccelerometerClass.PIN_NUMBERS[0], GPIO.LOW)134 GPIO.output(AccelerometerClass.PIN_NUMBERS[1], GPIO.LOW)135 GPIO.output(AccelerometerClass.PIN_NUMBERS[2], GPIO.LOW)136 GPIO.output(AccelerometerClass.PIN_NUMBERS[3], GPIO.LOW)137 GPIO.output(AccelerometerClass.PIN_NUMBERS[4], GPIO.LOW)138 GPIO.output(AccelerometerClass.PIN_NUMBERS[5], GPIO.LOW)139 GPIO.output(AccelerometerClass.PIN_NUMBERS[6], GPIO.LOW)140 GPIO.output(AccelerometerClass.PIN_NUMBERS[7], GPIO.LOW)141 GPIO.output(AccelerometerClass.SEND_READY, GPIO.LOW)142 # Setup input GPIO pins143 GPIO.setup(AccelerometerClass.ACK, GPIO.IN)144 GPIO.setup(AccelerometerClass.OFFPIN_IN, GPIO.IN)145if __name__ == "__main__":146 # execute only if run as a script...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!