Best Python code snippet using autotest_python
p4_operators.py
Source:p4_operators.py
...116 out += '\n'117 return out118 def get_commands(self):119 commands = list()120 commands.append(self.init_table.get_default_command())121 commands.append(self.update_table.get_default_command())122 commands.append(self.pass_table.get_default_command())123 commands.append(self.drop_table.get_default_command())124 return commands125 def get_control_flow(self, indent_level):126 indent = '\t' * indent_level127 out = ''128 out += '%sapply(%s);\n' % (indent, self.init_table.get_name())129 out += '%sif (%s %s %i) {\n' % (indent, self.value_field_name, self.comp_func, self.threshold)130 out += '%s\tapply(%s);\n' % (indent, self.pass_table.get_name())131 out += '%s\tapply(%s);\n' % (indent, self.update_table.get_name())132 out += '%s}\n' % (indent,)133 out += '%selse {\n' % (indent,)134 out += '%s\tapply(%s);\n' % (indent, self.drop_table.get_name())135 out += '%s}\n' % (indent,)136 return out137 def get_init_keys(self):138 return self.keys139class P4Reduce(P4Operator):140 def __init__(self, qid, operator_id, meta_init_name, drop_action, keys, values, threshold, read_register,141 p4_raw_fields):142 super(P4Reduce, self).__init__('Reduce', qid, operator_id, keys, p4_raw_fields)143 if threshold == '-1':144 self.threshold = int(THRESHOLD)145 else:146 self.threshold = int(threshold)147 self.read_register = read_register148 if self.read_register:149 self.out_headers += ['index']150 else:151 self.out_headers += ['count']152 # create METADATA to store index and value153 fields = [('value', REGISTER_WIDTH), ('index', REGISTER_NUM_INDEX_BITS)]154 self.metadata = MetaData(self.operator_name, fields)155 # create REGISTER to keep track of counts156 self.register = Register(self.operator_name, REGISTER_WIDTH, REGISTER_INSTANCE_COUNT)157 self.values = values158 # Add map init159 hash_init_fields = list()160 for fld in self.keys:161 if fld == 'qid':162 hash_init_fields.append(P4Field(layer=None, target_name="qid", sonata_name="qid",163 size=QID_SIZE))164 elif fld == 'count':165 hash_init_fields.append(P4Field(layer=None, target_name="count", sonata_name="count",166 size=COUNT_SIZE))167 elif fld == 'index':168 hash_init_fields.append(P4Field(layer=None, target_name="index", sonata_name="index",169 size=INDEX_SIZE))170 else:171 hash_init_fields.append(self.p4_raw_fields.get_target_field(fld))172 # create HASH for access to register173 hash_fields = list()174 for field in hash_init_fields:175 if '/' in field.sonata_name:176 self.logger.error('found a / in the key')177 raise NotImplementedError178 else:179 hash_fields.append('%s.%s' % (meta_init_name, field.target_name.replace(".", "_")))180 self.hash = HashFields(self.operator_name, hash_fields, 'crc16', REGISTER_NUM_INDEX_BITS)181 # name of metadata field where the index of the count within the register is stored182 self.index_field_name = '%s.index' % self.metadata.get_name()183 # name of metadata field where the count is kept temporarily184 self.value_field_name = '%s.value' % self.metadata.get_name()185 # create ACTION and TABLE to compute hash and get value186 primitives = list()187 primitives.append(ModifyFieldWithHashBasedOffset(self.index_field_name, 0, self.hash.get_name(),188 REGISTER_INSTANCE_COUNT))189 primitives.append(RegisterRead(self.value_field_name, self.register.get_name(), self.index_field_name))190 if self.values[0] == 'count':191 if self.threshold <= 1:192 self.threshold = '1'193 primitives.append(ModifyField(self.value_field_name, '%s + %i' % (self.value_field_name, 1)))194 else:195 target_fld = self.p4_raw_fields.get_target_field(self.values[0])196 if self.threshold <= 1:197 self.threshold = '%s.%s' % (meta_init_name, target_fld.target_name.replace(".", "_"))198 primitives.append(ModifyField(self.value_field_name, '%s + %s' % (199 self.value_field_name, '%s.%s' % (meta_init_name, target_fld.target_name.replace(".", "_")))))200 primitives.append(RegisterWrite(self.register.get_name(), self.index_field_name, self.value_field_name))201 self.init_action = Action('do_init_%s' % self.operator_name, primitives)202 table_name = 'init_%s' % self.operator_name203 self.init_table = Table(table_name, self.init_action.get_name(), [], None, 1)204 # create three TABLEs that implement reduce operation205 # if count <= THRESHOLD, update count and drop,206 table_name = 'drop_%s' % self.operator_name207 self.drop_table = Table(table_name, drop_action, [], None, 1)208 # if count == THRESHOLD, pass through with current count209 field_to_modified = None210 if not self.read_register:211 field_to_modified = ModifyField('%s.count' % meta_init_name, self.value_field_name)212 else:213 field_to_modified = ModifyField('%s.index' % meta_init_name, self.index_field_name)214 self.set_count_action = Action('set_count_%s' % self.operator_name,215 field_to_modified)216 table_name = 'first_pass_%s' % self.operator_name217 self.first_pass_table = Table(table_name, self.set_count_action.get_name(), [], None, 1)218 if not self.read_register:219 # if count > THRESHOLD, let it pass through with count set to 1220 self.reset_count_action = Action('reset_count_%s' % self.operator_name,221 ModifyField('%s.count' % meta_init_name, 1))222 table_name = 'pass_%s' % self.operator_name223 self.pass_table = Table(table_name, self.reset_count_action.get_name(), [], None, 1)224 def __repr__(self):225 return '.Reduce(keys=' + ','.join([x for x in self.keys]) + ', threshold=' + str(self.threshold) + ')'226 def get_code(self):227 out = ''228 out += '// %s %i of query %i\n' % (self.name, self.operator_id, self.query_id)229 out += self.metadata.get_code()230 out += self.hash.get_code()231 out += self.register.get_code()232 out += self.init_action.get_code()233 out += self.set_count_action.get_code()234 if not self.read_register:235 out += self.reset_count_action.get_code()236 out += self.pass_table.get_code()237 out += self.init_table.get_code()238 out += self.first_pass_table.get_code()239 out += self.drop_table.get_code()240 out += '\n'241 return out242 def get_commands(self):243 commands = list()244 commands.append(self.init_table.get_default_command())245 commands.append(self.first_pass_table.get_default_command())246 if not self.read_register: commands.append(self.pass_table.get_default_command())247 commands.append(self.drop_table.get_default_command())248 return commands249 def get_control_flow(self, indent_level):250 indent = '\t' * indent_level251 out = ''252 out += '%sapply(%s);\n' % (indent, self.init_table.get_name())253 out += '%sif (%s == %s) {\n' % (indent, self.value_field_name, self.threshold)254 out += '%s\tapply(%s);\n' % (indent, self.first_pass_table.get_name())255 out += '%s}\n' % (indent,)256 if not self.read_register:257 out += '%selse if (%s > %s) {\n' % (indent, self.value_field_name, self.threshold)258 out += '%s\tapply(%s);\n' % (indent, self.pass_table.get_name())259 out += '%s}\n' % (indent,)260 out += '%selse {\n' % (indent,)261 out += '%s\tapply(%s);\n' % (indent, self.drop_table.get_name())262 out += '%s}\n' % (indent,)263 return out264 def get_init_keys(self):265 return self.keys + self.values266class P4MapInit(P4Operator):267 def __init__(self, qid, operator_id, keys, p4_raw_fields):268 super(P4MapInit, self).__init__('MapInit', qid, operator_id, keys, p4_raw_fields)269 # Add map init270 map_init_fields = list()271 for fld in self.keys:272 if fld == 'qid':273 map_init_fields.append(P4Field(layer=None, target_name="qid", sonata_name="qid",274 size=QID_SIZE))275 elif fld == 'count':276 map_init_fields.append(P4Field(layer=None, target_name="count", sonata_name="count",277 size=COUNT_SIZE))278 elif fld == 'index':279 map_init_fields.append(P4Field(layer=None, target_name="index", sonata_name="index",280 size=INDEX_SIZE))281 else:282 map_init_fields.append(self.p4_raw_fields.get_target_field(fld))283 # create METADATA object to store data for all keys284 meta_fields = list()285 for fld in map_init_fields:286 meta_fields.append((fld.target_name.replace(".", "_"), fld.size))287 self.metadata = MetaData(self.operator_name, meta_fields)288 # create ACTION to initialize the metadata289 primitives = list()290 for fld in map_init_fields:291 sonata_name = fld.sonata_name292 target_name = fld.target_name293 meta_field_name = '%s.%s' % (self.metadata.get_name(), target_name.replace(".", "_"))294 if sonata_name == 'qid':295 # Assign query id to this field296 primitives.append(ModifyField(meta_field_name, qid))297 elif sonata_name == 'count':298 primitives.append(ModifyField(meta_field_name, 0))299 elif sonata_name == 'index':300 primitives.append(ModifyField(meta_field_name, 0))301 else:302 # Read data from raw header fields and assign them to these meta fields303 primitives.append(ModifyField(meta_field_name, target_name))304 self.action = Action('do_%s' % self.operator_name, primitives)305 # create dummy TABLE to execute the action306 self.table = Table(self.operator_name, self.action.get_name(), [], None, 1)307 def __repr__(self):308 return '.MapInit(keys=' + str(self.keys) + ')'309 def get_meta_name(self):310 return self.metadata.get_name()311 def get_code(self):312 out = ''313 out += '// MapInit of query %i\n' % self.query_id314 out += self.metadata.get_code()315 out += self.action.get_code()316 out += self.table.get_code()317 out += '\n'318 return out319 def get_commands(self):320 commands = list()321 commands.append(self.table.get_default_command())322 return commands323 def get_control_flow(self, indent_level):324 indent = '\t' * indent_level325 out = ''326 out += '%sapply(%s);\n' % (indent, self.table.get_name())327 return out328 def get_init_keys(self):329 return self.keys330class P4Map(P4Operator):331 def __init__(self, qid, operator_id, meta_init_name, keys, map_keys, map_values, func, p4_raw_fields):332 super(P4Map, self).__init__('Map', qid, operator_id, keys, p4_raw_fields)333 self.meta_init_name = meta_init_name334 self.map_keys = map_keys335 self.func = func336 self.map_values = map_values337 # Add map init338 map_fields = list()339 for fld in self.map_keys:340 if fld == 'qid':341 map_fields.append(P4Field(layer=None, target_name="qid", sonata_name="qid",342 size=QID_SIZE))343 elif fld == 'count':344 map_fields.append(P4Field(layer=None, target_name="count", sonata_name="count",345 size=COUNT_SIZE))346 else:347 map_fields.append(self.p4_raw_fields.get_target_field(fld))348 map_fields_values = list()349 for fld in self.map_values:350 if fld == 'qid':351 map_fields_values.append(P4Field(layer=None, target_name="qid", sonata_name="qid",352 size=QID_SIZE))353 elif fld == 'count':354 map_fields_values.append(P4Field(layer=None, target_name="count", sonata_name="count",355 size=COUNT_SIZE))356 else:357 map_fields_values.append(self.p4_raw_fields.get_target_field(fld))358 # create ACTION using the function359 primitives = list()360 if len(func) > 0:361 self.func = func362 if func[0] == 'mask' or not func[0]:363 for field in map_fields:364 mask_size = (func[1] / 4)365 mask = '0x' + ('f' * mask_size) + ('0' * (HEADER_MASK_SIZE[field.target_name] - mask_size))366 field_name = '%s.%s' % (self.meta_init_name, field.target_name.replace(".", "_"))367 primitives.append(BitAnd(field_name, field_name, mask))368 if func[0] == 'set' or not func[0]:369 for field in map_fields_values:370 field_name = '%s.%s' % (self.meta_init_name, field.target_name.replace(".", "_"))371 primitives.append(ModifyField(field_name, func[1]))372 self.action = Action('do_%s' % self.operator_name, primitives)373 # create dummy TABLE to execute the action374 self.table = Table(self.operator_name, self.action.get_name(), [], None, 1)375 def __repr__(self):376 return '.Map(keys=' + str(self.keys) + ', map_keys=' + str(self.map_keys) + ', map_values=' + str(self.map_values) + ', func=' + str(self.func) + ')'377 def get_code(self):378 out = ''379 out += '// Map %i of query %i\n' % (self.operator_id, self.query_id)380 out += self.action.get_code()381 out += self.table.get_code()382 out += '\n'383 return out384 def get_commands(self):385 commands = list()386 commands.append(self.table.get_default_command())387 return commands388 def get_control_flow(self, indent_level):389 indent = '\t' * indent_level390 out = ''391 out += '%sapply(%s);\n' % (indent, self.table.get_name())392 return out393 def get_init_keys(self):394 return list(self.keys) + list(self.map_keys) + list(self.map_values)395 def get_out_headers(self):396 return list(self.keys) + list(self.map_keys) + list(self.map_values)397class P4Filter(P4Operator):398 def __init__(self, qid, operator_id, keys, filter_keys, func, source, match_action, miss_action, p4_raw_fields):399 super(P4Filter, self).__init__('Filter', qid, operator_id, keys, p4_raw_fields)400 self.filter_keys = filter_keys401 self.filter_mask = None402 self.filter_values = None403 self.func = None404 # self.out_headers = []405 self.match_action = match_action406 self.miss_action = miss_action407 self.source = source408 if not len(func) > 0 or func[0] == 'geq':409 self.logger.error('Got the following func with the Filter Operator: %s' % (str(func),))410 # raise NotImplementedError411 else:412 self.func = func[0]413 if func[0] == 'mask':414 self.filter_mask = func[1]415 self.filter_values = func[2:]416 elif func[0] == 'eq':417 self.filter_values = [func[1:]]418 reads_fields = list()419 for filter_key in self.filter_keys:420 if self.func == 'mask':421 reads_fields.append((filter_key, 'lpm'))422 else:423 reads_fields.append((filter_key, 'exact'))424 self.table = Table(self.operator_name, miss_action, (match_action,), reads_fields, TABLE_SIZE)425 def __repr__(self):426 return '.Filter(filter_keys=' + str(self.filter_keys) + ', func=' + str(self.func) + ', src = ' + str(427 self.source) + ')'428 def get_code(self):429 out = ''430 out += '// Filter %i of query %i\n' % (self.operator_id, self.query_id)431 out += self.table.get_code()432 out += '\n'433 return out434 def get_commands(self):435 commands = list()436 commands.append(self.table.get_default_command())437 if self.filter_values:438 for filter_value in self.filter_values:439 commands.append(self.table.get_add_rule_command(self.match_action, filter_value, None))440 return commands441 def get_control_flow(self, indent_level):442 indent = '\t' * indent_level443 out = ''444 out += '%sapply(%s);\n' % (indent, self.table.get_name())445 return out446 def get_match_action(self):447 return self.match_action448 def get_filter_mask(self):449 return self.filter_mask450 def get_init_keys(self):...
custom_api.py
Source:custom_api.py
1import platform2import subprocess3class TextToSpeeckAPIClient:4 def __init__(self, command=None):5 self.command = command or get_default_command()6 def say(self, text):7 subprocess.call(self.command(text))8def get_default_command():9 system = platform.system()10 if system == "Darwin":11 return lambda text: ["say", text]12 elif system == "Linux":13 return lambda text: ["espeak", text]14 elif system == "Windows":15 return lambda text: ["powershell", "-c", f"echo {text}"]16 else:17 raise Exception("Unsupported system")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!