How to use _parse_status method in autotest

Best Python code snippet using autotest_python

parsers.py

Source:parsers.py Github

copy

Full Screen

...27 self.status = {'timestamp': None,28 'tijdReferentie': None}29 for key in logged_types:30 self.status[key] = {}31 def _parse_status(self, message, data_size):32 """33 Parse the status part of a message.34 Parameters35 ----------36 message : str37 V-log message.38 data_size : float39 Size of one data item, in hex.40 Returns41 ----------42 num_sensors : int43 Number of sensors in status.44 """45 assert int(message[:2], 16) % 2 == 1, "Not status message"46 self.status['deltaTijd'] = int(message[2:5], 16)/10 # Log in seconds47 self._update_time()48 num_sensors = int(hex_string_to_bits(message[5:8])[2:], 2)49 assert len(message[8:]) >= data_size * num_sensors, "Num sensors exceeds message length"50 return num_sensors51 def _parse_update(self, message, data_size):52 """53 Parse the update part of a message.54 Parameters55 ----------56 message : str57 V-log message.58 data_size : float59 Size of one data item, in hex.60 Returns61 ----------62 num_sensors : int63 Number of sensors in status.64 """65 assert int(message[:2], 16) % 2 == 0, "Not update message"66 self.status['deltaTijd'] = int(message[2:5], 16)/10 # Log in seconds67 self._update_time()68 num_sensors = int(message[5], 16)69 assert len(message[6:]) >= data_size * num_sensors, "Num sensors exceeds message length"70 return num_sensors71 def parse_message(self, message):72 """73 Parse a v-log message and update the status.74 If the status is complete log_status is called.75 Parameters76 ----------77 message : str78 V-log message.79 """80 message_type = int(message[:2], 16)81 # Check if message is to be logged82 if message_type not in self.logged_types:83 return84 if message_type == 1:85 # Time reference86 # Sometimes the time is given as 24:00 not 00:00 so add the time to the date to deal with this87 self.status['tijdReferentie'] = (88 datetime(89 int(message[2:6]),90 int(message[6:8]),91 int(message[8:10])92 )93 + timedelta(94 hours=int(message[10:12]),95 minutes=int(message[12:14]),96 seconds=int(message[14:16]),97 milliseconds=int(message[16]) * 10098 )99 ).timestamp()100 self.status['deltaTijd'] = 0101 elif message_type == 4:102 # V-Log information103 self.status['vlogInformatie']['V-Log versie'] = "{}.{}.{}".format(int(message[2:4], 16),104 int(message[4:6], 16),105 int(message[6:8], 16))106 vri_id = ''107 i = 8108 while i < len(message):109 vri_id += chr(int(message[i:i + 2], 16))110 i += 2111 self.status['vlogInformatie']['VRI id'] = vri_id.strip() # Remove whitespace112 elif message_type == 5:113 # Detection status114 num_sensors = self._parse_status(message, data_size=1)115 for i in range(0, num_sensors):116 self.status['detectie'][i] = parse_detection_data(message[8 + i])117 elif message_type == 6:118 # Detection update119 num_sensors = self._parse_update(message, data_size=4)120 for i in range(0, num_sensors):121 index = int(message[6 + i * 4:8 + i * 4], 16)122 if index in self.status['detectie'].keys():123 self.status['detectie'][index] = parse_detection_data(message[9 + i * 4])124 elif message_type == 7:125 # Other input status126 num_sensors = self._parse_status(message, data_size=0.25)127 status_bits = hex_string_to_bits(message[8:])128 for i in range(0, num_sensors):129 self.status['overigeIngangen'][i] = int(status_bits[i], 2)130 elif message_type == 8:131 # Other input update132 num_sensors = self._parse_update(message, data_size=2)133 for i in range(0, num_sensors):134 status_bits = hex_string_to_bits(message[6 + i * 2:8 + i * 2])135 index = int(status_bits[:-1], 2)136 if index in self.status['overigeIngangen'].keys():137 self.status['overigeIngangen'][index] = int(status_bits[-1], 2)138 elif message_type == 9:139 # Internal phase status140 num_sensors = self._parse_status(message, data_size=3)141 for i in range(0, num_sensors):142 self.status['interneFaseCyclus'][i] = parse_internal_data(message[8 + i * 3:11 + i * 3])143 elif message_type == 10:144 # Internal phase update145 num_sensors = self._parse_update(message, data_size=6)146 for i in range(0, num_sensors):147 index = int(message[6 + i * 6:8 + i * 6], 16)148 if index in self.status['interneFaseCyclus'].keys():149 self.status['interneFaseCyclus'][index] = parse_internal_data(message[9 + i * 6:12 + i * 6])150 elif message_type == 11:151 # Other output status (GUS)152 num_sensors = self._parse_status(message, data_size=0.25)153 status_bits = hex_string_to_bits(message[8:])154 for i in range(0, num_sensors):155 self.status['overigeUitgangenGUS'][i] = int(status_bits[i], 2)156 elif message_type == 12:157 # Other output update (GUS)158 num_sensors = self._parse_update(message, data_size=2)159 for i in range(0, num_sensors):160 status_bits = hex_string_to_bits(message[6 + i * 2:8 + i * 2])161 index = int(status_bits[:-1], 2)162 if index in self.status['overigeUitgangenGUS'].keys():163 self.status['overigeUitgangenGUS'][index] = int(status_bits[-1], 2)164 elif message_type == 13:165 # External phase status166 num_sensors = self._parse_status(message, data_size=1)167 for i in range(0, num_sensors):168 self.status['externeSignaalgroep'][i] = int(message[8 + i], 16)169 elif message_type == 14:170 # External phase update171 num_sensors = self._parse_update(message, data_size=4)172 for i in range(0, num_sensors):173 index = int(message[6 + i * 4:8 + i * 4], 16)174 if index in self.status['externeSignaalgroep'].keys():175 self.status['externeSignaalgroep'][index] = int(message[8 + i * 4:10 + i * 4], 16)176 elif message_type == 15:177 # Other output status (WUS)178 num_sensors = self._parse_status(message, data_size=0.25)179 status_bits = hex_string_to_bits(message[8:])180 for i in range(0, num_sensors):181 self.status['overigeUitgangenWUS'][i] = int(status_bits[i], 2)182 elif message_type == 16:183 # Other output update (WUS)184 num_sensors = self._parse_update(message, data_size=2)185 for i in range(0, num_sensors):186 status_bits = hex_string_to_bits(message[6 + i * 2:8 + i * 2])187 index = int(status_bits[:-1], 2)188 if index in self.status['overigeUitgangenWUS'].keys():189 self.status['overigeUitgangenWUS'][index] = int(status_bits[-1], 2)190 elif message_type == 17:191 # Desired program status192 num_sensors = self._parse_status(message, data_size=1)193 for i in range(0, num_sensors):194 self.status['gewensteProgrammaStatus'][i] = int(message[8 + i], 16)195 elif message_type == 18:196 # Desired program update197 num_sensors = self._parse_update(message, data_size=2)198 for i in range(0, num_sensors):199 index = int(message[6 + i * 2], 16)200 if index in self.status['gewensteProgrammaStatus'].keys():201 self.status['gewensteProgrammaStatus'][index] = int(message[7 + i * 2], 16)202 elif message_type == 19:203 # Actual program status204 num_sensors = self._parse_status(message, data_size=1)205 for i in range(0, num_sensors):206 self.status['werkelijkeProgrammaStatus'][i] = int(message[8 + i], 16)207 elif message_type == 20:208 # Actual program update209 num_sensors = self._parse_update(message, data_size=2)210 for i in range(0, num_sensors):211 index = int(message[6 + i * 2], 16)212 if index in self.status['werkelijkeProgrammaStatus'].keys():213 self.status['werkelijkeProgrammaStatus'][index] = int(message[7 + i * 2], 16)214 elif message_type == 23:215 # Thermometer status216 num_sensors = self._parse_status(message, data_size=1)217 for i in range(0, num_sensors):218 status_bits = hex_string_to_bits(message[8 + i])219 self.status['thermometer'][i] = {'MVG': int(status_bits[-1], 2),220 'RNA': int(status_bits[-2], 2)}221 elif message_type == 24:222 # Thermometer update223 num_sensors = self._parse_update(message, data_size=2)224 for i in range(0, num_sensors):225 status_bits = hex_string_to_bits(message[7 + i * 2])226 index = int(message[6 + i * 2], 16)227 if index in self.status['thermometer'].keys():228 self.status['thermometer'][index] = {'MVG': int(status_bits[-1], 2),229 'RNA': int(status_bits[-2], 2)}230 elif message_type == 32:...

Full Screen

Full Screen

test_parsing.py

Source:test_parsing.py Github

copy

Full Screen

...14 "\n"15 "# userid name uniqueid connected ping loss state rate adr\n"16 "#end\n")17 server_dict = {}18 self.exporter._parse_status(data, server_dict)19 self.assertEqual(server_dict["hostname"], "LinuxGSM")20 self.assertEqual(server_dict["bots"], "0")21 self.assertEqual(server_dict["players"], "0")22 self.assertEqual(server_dict["max_players"], "16")23 def test_csgo_stats(self):24 data = (" CPU NetIn NetOut Uptime Maps FPS Players Svms +-ms ~tick\n"25 " 10.0 0.0 0.0 46732 1 64.07 0 5.31 0.08 0.03\n")26 server_dict = dict()27 self.exporter._parse_stats(data, server_dict)28 for key, value in {"CPU": 10, "NetIn": 0, "NetOut": 0, "Uptime": 46732,29 "Maps": 1, "FPS": 64.07, "Players": 0, "svarms": 5.31,30 "varms": 0.08, "vartick": 0.03}.items():31 self.assertEqual(server_dict[key], value)32 def test_fof_status(self):33 data = ("hostname: LinuxGSM\n"34 "version : 468/24 0 secure\n"35 "udp/ip : 0.0.0.0:27015 (public ip: 141.70.124.33)\n"36 "steamid : [A:1:1510767623:12537] (90125840063167495)\n"37 "map : fof_depot at: 0 x, 0 y, 0 z\n"38 "tags : \n"39 "players : 0 humans, 0 bots (16 max)\n"40 "edicts : 606 used of 2048 max\n"41 "# userid name uniqueid connected ping loss state adr\n")42 server_dict = {}43 self.exporter._parse_status(data, server_dict)44 self.assertEqual(server_dict["hostname"], "LinuxGSM")45 self.assertEqual(server_dict["bots"], "0")46 self.assertEqual(server_dict["players"], "0")47 self.assertEqual(server_dict["max_players"], "16")48 def test_fof_stats(self):49 data = ("CPU In_(KB/s) Out_(KB/s) Uptime Map_changes FPS Players Connects\n"50 "0.00 0.00 0.00 8 0 66.64 0 0\n")51 server_dict = dict()52 self.exporter._parse_stats(data, server_dict)53 for key, value in {"CPU": 0, "NetIn": 0, "NetOut": 0, "Uptime": 8,54 "Maps": 0, "FPS": 66.64, "Players": 0, "Connects": 0}.items():55 self.assertEqual(server_dict[key], value)56 def test_hl2dm_status(self):57 data = ("hostname: LinuxGSM\n"58 "version : 4630212/24 4630212 insecure (secure mode enabled, disconnected from Steam3)\n"59 "udp/ip : 0.0.0.0:27015\n"60 "steamid : not logged in\n"61 "map : dm_lockdown at: 0 x, 0 y, 0 z\n"62 "tags : alltalk,increased_maxplayers\n"63 "players : 0 humans, 0 bots (16 max)\n"64 "edicts : 495 used of 2048 max\n"65 "# userid name uniqueid connected ping loss state adr\n")66 server_dict = {}67 self.exporter._parse_status(data, server_dict)68 self.assertEqual(server_dict["hostname"], "LinuxGSM")69 self.assertEqual(server_dict["bots"], "0")70 self.assertEqual(server_dict["players"], "0")71 self.assertEqual(server_dict["max_players"], "16")72 def test_hl2dm_stats(self):73 data = ("CPU In_(KB/s) Out_(KB/s) Uptime Map_changes FPS Players Connects\n"74 "0.00 0.00 0.00 0 0 66.70 0 0\n")75 server_dict = dict()76 self.exporter._parse_stats(data, server_dict)77 for key, value in {"CPU": 0, "NetIn": 0, "NetOut": 0, "Uptime": 0,78 "Maps": 0, "FPS": 66.70, "Players": 0, "Connects": 0}.items():79 self.assertEqual(server_dict[key], value)80 def test_tf2_status(self):81 data = ("hostname: LinuxGSM\n"82 "version : 5063830/24 5063830 insecure (secure mode enabled, disconnected from Steam3)\n"83 "udp/ip : 0.0.0.0:27015\n"84 "steamid : not logged in\n"85 "account : not logged in (No account specified)\n"86 "map : cp_badlands at: 0 x, 0 y, 0 z\n"87 "tags : cp\n"88 "players : 0 humans, 0 bots (16 max)\n"89 "edicts : 550 used of 2048 max\n"90 "# userid name uniqueid connected ping loss state adr\n")91 server_dict = {}92 self.exporter._parse_status(data, server_dict)93 self.assertEqual(server_dict["hostname"], "LinuxGSM")94 self.assertEqual(server_dict["bots"], "0")95 self.assertEqual(server_dict["players"], "0")96 self.assertEqual(server_dict["max_players"], "16")97 def test_tf2_stats(self):98 data = ("CPU In_(KB/s) Out_(KB/s) Uptime Map_changes FPS Players Connects\n"99 "0.00 0.00 0.00 0 0 66.67 0 0\n")100 server_dict = dict()101 self.exporter._parse_stats(data, server_dict)102 for key, value in {"CPU": 0, "NetIn": 0, "NetOut": 0, "Uptime": 0,103 "Maps": 0, "FPS": 66.67, "Players": 0, "Connects": 0}.items():104 self.assertEqual(server_dict[key], value)105 def test_gmod_status(self):106 data = ("hostname: LinuxGSM\n"107 "version : 19.02.18/24 7472 insecure (secure mode enabled, disconnected from Steam3)\n"108 "udp/ip : 192.0.2.1:27015\n"109 "map : gm_construct at: 0 x, 0 y, 0 z\n"110 "players : 0 (16 max)\n"111 "\n"112 "# userid name uniqueid connected ping loss state adr\n")113 server_dict = {}114 self.exporter._parse_status(data, server_dict)115 self.assertEqual(server_dict["hostname"], "LinuxGSM")116 self.assertEqual(server_dict["players"], "0")117 self.assertEqual(server_dict["max_players"], "16")118 def test_gmod_stats(self):119 data = ("CPU In_(KB/s) Out_(KB/s) Uptime Map_changes FPS Players Connects\n"120 "33.00 0.00 0.00 1 0 66.07 0 0\n")121 server_dict = dict()122 self.exporter._parse_stats(data, server_dict)123 for key, value in {"CPU": 33, "NetIn": 0, "NetOut": 0, "Uptime": 1,124 "Maps": 0, "FPS": 66.07, "Players": 0, "Connects": 0}.items():125 self.assertEqual(server_dict[key], value)126 def test_css_status(self):127 data = ("hostname: LinuxGSM\n"128 "version : 4630212/24 4630212 insecure (secure mode enabled, disconnected from Steam3)\n"129 "udp/ip : 0.0.0.0:27015\n"130 "steamid : not logged in\n"131 "map : de_dust2 at: 0 x, 0 y, 0 z\n"132 "tags :\n"133 "players : 0 humans, 0 bots (16 max)\n"134 "edicts : 212 used of 2048 max\n"135 "# userid name uniqueid connected ping loss state adr\n")136 server_dict = {}137 self.exporter._parse_status(data, server_dict)138 self.assertEqual(server_dict["hostname"], "LinuxGSM")139 self.assertEqual(server_dict["players"], "0")140 self.assertEqual(server_dict["bots"], "0")141 self.assertEqual(server_dict["max_players"], "16")142 def test_css_stats(self):143 data = ("CPU In_(KB/s) Out_(KB/s) Uptime Map_changes FPS Players Connects\n"144 "0.00 0.00 0.00 2 0 66.55 0 0\n")145 server_dict = dict()146 self.exporter._parse_stats(data, server_dict)147 for key, value in {"CPU": 0, "NetIn": 0, "NetOut": 0, "Uptime": 2,148 "Maps": 0, "FPS": 66.55, "Players": 0, "Connects": 0}.items():...

Full Screen

Full Screen

HHC-N8I8OP.py

Source:HHC-N8I8OP.py Github

copy

Full Screen

...36 self.ip = ip37 self.port = port38 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)39 self.s.connect((self.ip, self.port))40 status = self._parse_status(self.execute('read'))41 self._switch = {index: Switch(self, index, state) for index, state in status}42 def execute(self, c):43 time.sleep(0.1)44 c = str(c).encode()45 self.s.send(c)46 data = self.s.recv(8192)47 return data.decode()48 def __getitem__(self, item):49 return self._switch[item]50 @staticmethod51 def _parse_status(data):52 return [[index + 1, bool(int(state))] for index, state in53 enumerate(reversed(data.split('relay')[1]))]54 def _update_status(self, status):55 for i, s in status:56 self._switch[i].state = s57 def status(self):58 data = self.execute('read')59 status = self._parse_status(data)60 self._update_status(status)61 return status62 def state(self, state: bool, switch=None):63 if not switch:64 command = str(int(state)) * len(self._switch)65 data = self.execute(f'all{command}')66 status = self._parse_status(data)67 self._update_status(status)68 else:69 self._switch[switch].power = state70 def __repr__(self):71 return str([s for s in self._switch.values()])72 def __del__(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful