Best Python code snippet using nose2
generate_mojom.py
Source:generate_mojom.py
1#! /usr/bin/env python2# Copyright 2016 The Chromium Authors. All rights reserved.3# Use of this source code is governed by a BSD-style license that can be4# found in the LICENSE file.5"""A generator of mojom interfaces and typemaps from Chrome IPC messages.6For example,7generate_mojom.py content/common/file_utilities_messages.h8 --output_mojom=content/common/file_utilities.mojom9 --output_typemap=content/common/file_utilities.typemap10"""11import argparse12import logging13import os14import re15import subprocess16import sys17_MESSAGE_PATTERN = re.compile(18 r'(?:\n|^)IPC_(SYNC_)?MESSAGE_(ROUTED|CONTROL)(\d_)?(\d)')19_VECTOR_PATTERN = re.compile(r'std::(vector|set)<(.*)>')20_MAP_PATTERN = re.compile(r'std::map<(.*), *(.*)>')21_NAMESPACE_PATTERN = re.compile(r'([a-z_]*?)::([A-Z].*)')22_unused_arg_count = 023def _git_grep(pattern, paths_pattern):24 try:25 args = ['git', 'grep', '-l', '-e', pattern, '--'] + paths_pattern26 result = subprocess.check_output(args).strip().splitlines()27 logging.debug('%s => %s', ' '.join(args), result)28 return result29 except subprocess.CalledProcessError:30 logging.debug('%s => []', ' '.join(args))31 return []32def _git_multigrep(patterns, paths):33 """Find a list of files that match all of the provided patterns."""34 if isinstance(paths, str):35 paths = [paths]36 if isinstance(patterns, str):37 patterns = [patterns]38 for pattern in patterns:39 # Search only the files that matched previous patterns.40 paths = _git_grep(pattern, paths)41 if not paths:42 return []43 return paths44class Typemap(object):45 def __init__(self, typemap_files):46 self._typemap_files = typemap_files47 self._custom_mappings = {}48 self._new_custom_mappings = {}49 self._imports = set()50 self._public_includes = set()51 self._traits_includes = set()52 self._enums = set()53 def load_typemaps(self):54 for typemap in self._typemap_files:55 self.load_typemap(typemap)56 def load_typemap(self, path):57 typemap = {}58 with open(path) as f:59 content = f.read().replace('=\n', '=')60 exec content in typemap61 for mapping in typemap['type_mappings']:62 mojom, native = mapping.split('=')63 self._custom_mappings[native] = {'name': mojom,64 'mojom': typemap['mojom'].strip('/')}65 def generate_typemap(self, output_mojom, input_filename, namespace):66 new_mappings = sorted(self._format_new_mappings(namespace))67 if not new_mappings:68 return69 yield """# Copyright 2016 The Chromium Authors. All rights reserved.70# Use of this source code is governed by a BSD-style license that can be71# found in the LICENSE file.72"""73 yield 'mojom = "//%s"' % output_mojom74 yield 'public_headers = [%s\n]' % ''.join(75 '\n "//%s",' % include for include in sorted(self._public_includes))76 yield 'traits_headers = [%s\n]' % ''.join(77 '\n "//%s",' % include78 for include in sorted(self._traits_includes.union([os.path.normpath(79 input_filename)])))80 yield 'deps = [ "//ipc" ]'81 yield 'type_mappings = [\n %s\n]' % '\n '.join(new_mappings)82 def _format_new_mappings(self, namespace):83 for native, mojom in self._new_custom_mappings.iteritems():84 yield '"%s.%s=::%s",' % (namespace, mojom, native)85 def format_new_types(self):86 for native_type, typename in self._new_custom_mappings.iteritems():87 if native_type in self._enums:88 yield '[Native]\nenum %s;\n' % typename89 else:90 yield '[Native]\nstruct %s;\n' % typename91 _BUILTINS = {92 'bool': 'bool',93 'int': 'int32',94 'unsigned': 'uint32',95 'char': 'uint8',96 'unsigned char': 'uint8',97 'short': 'int16',98 'unsigned short': 'uint16',99 'int8_t': 'int8',100 'int16_t': 'int16',101 'int32_t': 'int32',102 'int64_t': 'int64',103 'uint8_t': 'uint8',104 'uint16_t': 'uint16',105 'uint32_t': 'uint32',106 'uint64_t': 'uint64',107 'float': 'float',108 'double': 'double',109 'std::string': 'string',110 'base::string16': 'string',111 'base::FilePath::StringType': 'string',112 'base::SharedMemoryHandle': 'handle<shared_memory>',113 'IPC::PlatformFileForTransit': 'handle',114 'base::FileDescriptor': 'handle',115 }116 def lookup_type(self, typename):117 try:118 return self._BUILTINS[typename]119 except KeyError:120 pass121 vector_match = _VECTOR_PATTERN.search(typename)122 if vector_match:123 return 'array<%s>' % self.lookup_type(vector_match.groups()[1].strip())124 map_match = _MAP_PATTERN.search(typename)125 if map_match:126 return 'map<%s, %s>' % tuple(self.lookup_type(t.strip())127 for t in map_match.groups())128 try:129 result = self._custom_mappings[typename]['name']130 mojom = self._custom_mappings[typename].get('mojom', None)131 if mojom:132 self._imports.add(mojom)133 return result134 except KeyError:135 pass136 match = _NAMESPACE_PATTERN.match(typename)137 if match:138 namespace, name = match.groups()139 else:140 namespace = ''141 name = typename142 namespace = namespace.replace('::', '.')143 cpp_name = name144 name = name.replace('::', '')145 if name.endswith('Params'):146 try:147 _, name = name.rsplit('Msg_')148 except ValueError:149 try:150 _, name = name.split('_', 1)151 except ValueError:152 pass153 if namespace.endswith('.mojom'):154 generated_mojom_name = '%s.%s' % (namespace, name)155 elif not namespace:156 generated_mojom_name = 'mojom.%s' % name157 else:158 generated_mojom_name = '%s.mojom.%s' % (namespace, name)159 self._new_custom_mappings[typename] = name160 self._add_includes(namespace, cpp_name, typename)161 generated_mojom_name = name162 self._custom_mappings[typename] = {'name': generated_mojom_name}163 return generated_mojom_name164 def _add_includes(self, namespace, name, fullname):165 name_components = name.split('::')166 is_enum = False167 for i in xrange(len(name_components)):168 subname = '::'.join(name_components[i:])169 extra_names = name_components[:i] + [subname]170 patterns = [r'\(struct\|class\|enum\)[A-Z_ ]* %s {' % s171 for s in extra_names]172 if namespace:173 patterns.extend(r'namespace %s' % namespace_component174 for namespace_component in namespace.split('.'))175 includes = _git_multigrep(patterns, '*.h')176 if includes:177 if _git_grep(r'enum[A-Z_ ]* %s {' % subname, includes):178 self._enums.add(fullname)179 is_enum = True180 logging.info('%s => public_headers = %s', fullname, includes)181 self._public_includes.update(includes)182 break183 if is_enum:184 patterns = ['IPC_ENUM_TRAITS[A-Z_]*(%s' % fullname]185 else:186 patterns = [r'\(IPC_STRUCT_TRAITS_BEGIN(\|ParamTraits<\)%s' % fullname]187 includes = _git_multigrep(188 patterns,189 ['*messages.h', '*struct_traits.h', 'ipc/ipc_message_utils.h'])190 if includes:191 logging.info('%s => traits_headers = %s', fullname, includes)192 self._traits_includes.update(includes)193 def format_imports(self):194 for import_name in sorted(self._imports):195 yield 'import "%s";' % import_name196 if self._imports:197 yield ''198class Argument(object):199 def __init__(self, typename, name):200 self.typename = typename.strip()201 self.name = name.strip().replace('\n', '').replace(' ', '_').lower()202 if not self.name:203 global _unused_arg_count204 self.name = 'unnamed_arg%d' % _unused_arg_count205 _unused_arg_count += 1206 def format(self, typemaps):207 return '%s %s' % (typemaps.lookup_type(self.typename), self.name)208class Message(object):209 def __init__(self, match, content):210 self.sync = bool(match[0])211 self.routed = match[1] == 'ROUTED'212 self.args = []213 self.response_args = []214 if self.sync:215 num_expected_args = int(match[2][:-1])216 num_expected_response_args = int(match[3])217 else:218 num_expected_args = int(match[3])219 num_expected_response_args = 0220 body = content.split(',')221 name = body[0].strip()222 try:223 self.group, self.name = name.split('Msg_')224 except ValueError:225 try:226 self.group, self.name = name.split('_')227 except ValueError:228 self.group = 'UnnamedInterface'229 self.name = name230 self.group = '%s%s' % (self.group, match[1].title())231 args = list(self.parse_args(','.join(body[1:])))232 if len(args) != num_expected_args + num_expected_response_args:233 raise Exception('Incorrect number of args parsed for %s' % (name))234 self.args = args[:num_expected_args]235 self.response_args = args[num_expected_args:]236 def parse_args(self, args_str):237 args_str = args_str.strip()238 if not args_str:239 return240 looking_for_type = False241 type_start = 0242 comment_start = None243 comment_end = None244 type_end = None245 angle_bracket_nesting = 0246 i = 0247 while i < len(args_str):248 if args_str[i] == ',' and not angle_bracket_nesting:249 looking_for_type = True250 if type_end is None:251 type_end = i252 elif args_str[i:i + 2] == '/*':253 if type_end is None:254 type_end = i255 comment_start = i + 2256 comment_end = args_str.index('*/', i + 2)257 i = comment_end + 1258 elif args_str[i:i + 2] == '//':259 if type_end is None:260 type_end = i261 comment_start = i + 2262 comment_end = args_str.index('\n', i + 2)263 i = comment_end264 elif args_str[i] == '<':265 angle_bracket_nesting += 1266 elif args_str[i] == '>':267 angle_bracket_nesting -= 1268 elif looking_for_type and args_str[i].isalpha():269 if comment_start is not None and comment_end is not None:270 yield Argument(args_str[type_start:type_end],271 args_str[comment_start:comment_end])272 else:273 yield Argument(args_str[type_start:type_end], '')274 type_start = i275 type_end = None276 comment_start = None277 comment_end = None278 looking_for_type = False279 i += 1280 if comment_start is not None and comment_end is not None:281 yield Argument(args_str[type_start:type_end],282 args_str[comment_start:comment_end])283 else:284 yield Argument(args_str[type_start:type_end], '')285 def format(self, typemaps):286 result = '%s(%s)' % (self.name, ','.join('\n %s' % arg.format(typemaps)287 for arg in self.args))288 if self.sync:289 result += ' => (%s)' % (',\n'.join('\n %s' % arg.format(typemaps)290 for arg in self.response_args))291 result = '[Sync]\n %s' % result292 return '%s;' % result293class Generator(object):294 def __init__(self, input_name, output_namespace):295 self._input_name = input_name296 with open(input_name) as f:297 self._content = f.read()298 self._namespace = output_namespace299 self._typemaps = Typemap(self._find_typemaps())300 self._interface_definitions = []301 def _get_messages(self):302 for m in _MESSAGE_PATTERN.finditer(self._content):303 i = m.end() + 1304 while i < len(self._content):305 if self._content[i:i + 2] == '/*':306 i = self._content.index('*/', i + 2) + 1307 elif self._content[i] == ')':308 yield Message(m.groups(), self._content[m.end() + 1:i])309 break310 i += 1311 def _extract_messages(self):312 grouped_messages = {}313 for m in self._get_messages():314 grouped_messages.setdefault(m.group, []).append(m)315 self._typemaps.load_typemaps()316 for interface, messages in grouped_messages.iteritems():317 self._interface_definitions.append(self._format_interface(interface,318 messages))319 def count(self):320 grouped_messages = {}321 for m in self._get_messages():322 grouped_messages.setdefault(m.group, []).append(m)323 return sum(len(messages) for messages in grouped_messages.values())324 def generate_mojom(self):325 self._extract_messages()326 if not self._interface_definitions:327 return328 yield """// Copyright 2016 The Chromium Authors. All rights reserved.329// Use of this source code is governed by a BSD-style license that can be330// found in the LICENSE file.331"""332 yield 'module %s;\n' % self._namespace333 for import_statement in self._typemaps.format_imports():334 yield import_statement335 for typemap in self._typemaps.format_new_types():336 yield typemap337 for interface in self._interface_definitions:338 yield interface339 yield ''340 def generate_typemap(self, output_mojom, input_filename):341 return '\n'.join(self._typemaps.generate_typemap(342 output_mojom, input_filename, self._namespace)).strip()343 @staticmethod344 def _find_typemaps():345 return subprocess.check_output(346 ['git', 'ls-files', '*.typemap']).strip().split('\n')347 def _format_interface(self, name, messages):348 return 'interface %s {\n %s\n};' % (name,349 '\n '.join(m.format(self._typemaps)350 for m in messages))351def parse_args():352 parser = argparse.ArgumentParser(description=__doc__)353 parser.add_argument('input', help='input messages.h file')354 parser.add_argument(355 '--output_namespace',356 default='mojom',357 help='the mojom module name to use in the generated mojom file '358 '(default: %(default)s)')359 parser.add_argument('--output_mojom', help='output mojom path')360 parser.add_argument('--output_typemap', help='output typemap path')361 parser.add_argument(362 '--count',363 action='store_true',364 default=False,365 help='count the number of messages in the input instead of generating '366 'a mojom file')367 parser.add_argument('-v',368 '--verbose',369 action='store_true',370 help='enable logging')371 parser.add_argument('-vv', action='store_true', help='enable debug logging')372 return parser.parse_args()373def main():374 args = parse_args()375 if args.vv:376 logging.basicConfig(level=logging.DEBUG)377 elif args.verbose:378 logging.basicConfig(level=logging.INFO)379 generator = Generator(args.input, args.output_namespace)380 if args.count:381 count = generator.count()382 if count:383 print '%d %s' % (generator.count(), args.input)384 return385 mojom = '\n'.join(generator.generate_mojom()).strip()386 if not mojom:387 return388 typemap = generator.generate_typemap(args.output_mojom, args.input)389 if args.output_mojom:390 with open(args.output_mojom, 'w') as f:391 f.write(mojom)392 else:393 print mojom394 if typemap:395 if args.output_typemap:396 with open(args.output_typemap, 'w') as f:397 f.write(typemap)398 else:399 print typemap400if __name__ == '__main__':...
__init__.py
Source:__init__.py
1# -*- coding: utf-8 -*-2import itertools3import operator4from typing import TYPE_CHECKING, List, Set, Union5from warnings import warn6import rdflib7from pyshacl.consts import (8 SH_filterShape,9 SH_intersection,10 SH_nodes,11 SH_object,12 SH_path,13 SH_predicate,14 SH_subject,15 SH_this,16 SH_union,17)18from pyshacl.errors import ReportableRuntimeError19from pyshacl.rules.shacl_rule import SHACLRule20if TYPE_CHECKING:21 from pyshacl.pytypes import GraphLike, RDFNode22 from pyshacl.shape import Shape23 from pyshacl.shapes_graph import ShapesGraph24class TripleRule(SHACLRule):25 __slots__ = ("s", "p", "o")26 def __init__(self, shape: 'Shape', rule_node: 'rdflib.term.Identifier', **kwargs):27 """28 :param shape:29 :type shape: Shape30 :param rule_node:31 :type rule_node: rdflib.term.Identifier32 """33 super(TripleRule, self).__init__(shape, rule_node, **kwargs)34 my_subject_nodes = set(self.shape.sg.objects(self.node, SH_subject))35 if len(my_subject_nodes) < 1:36 raise RuntimeError("No sh:subject")37 elif len(my_subject_nodes) > 1:38 raise RuntimeError("Too many sh:subject")39 self.s = next(iter(my_subject_nodes))40 my_predicate_nodes = set(self.shape.sg.objects(self.node, SH_predicate))41 if len(my_predicate_nodes) < 1:42 raise RuntimeError("No sh:predicate")43 elif len(my_predicate_nodes) > 1:44 raise RuntimeError("Too many sh:predicate")45 self.p = next(iter(my_predicate_nodes))46 my_object_nodes = set(self.shape.sg.objects(self.node, SH_object))47 if len(my_object_nodes) < 1:48 raise RuntimeError("No sh:object")49 elif len(my_object_nodes) > 1:50 raise RuntimeError("Too many sh:object")51 self.o = next(iter(my_object_nodes))52 def get_nodes_from_node_expression(53 self, expr, focus_node, data_graph: 'GraphLike', recurse_depth=054 ) -> Union[Set['RDFNode'], List['RDFNode']]:55 if expr == SH_this:56 return [focus_node]57 sg = self.shape.sg # type: ShapesGraph58 if isinstance(expr, (rdflib.URIRef, rdflib.Literal)):59 return [expr]60 elif isinstance(expr, rdflib.BNode):61 unions = set(sg.objects(expr, SH_union))62 intersections = set(sg.objects(expr, SH_intersection))63 if len(unions) and len(intersections):64 raise ReportableRuntimeError("Cannot have sh:intersection and sh:union on the same bnode.")65 if recurse_depth > 8 and (len(unions) or len(intersections)):66 warn(Warning("sh:union and sh:intersection depth too deep. Won't capture all of it!"))67 return []68 if len(unions):69 union_list = next(iter(unions))70 parts = list(sg.graph.items(union_list))71 all_nodes: Set['RDFNode'] = set()72 for p in parts:73 new_parts = self.get_nodes_from_node_expression(74 p, focus_node, data_graph, recurse_depth=recurse_depth + 175 )76 all_nodes = all_nodes.union(new_parts)77 return all_nodes78 if len(intersections):79 inter_list = next(iter(intersections))80 parts = list(data_graph.items(inter_list))81 inter_nodes: Set[RDFNode] = set()82 new = True83 for p in parts:84 new_parts = self.get_nodes_from_node_expression(85 p, focus_node, data_graph, recurse_depth=recurse_depth + 186 )87 if new is True:88 inter_nodes = set(iter(new_parts))89 new = False90 else:91 inter_nodes = inter_nodes.intersection(new_parts)92 return inter_nodes93 path_nodes = set(sg.objects(expr, SH_path))94 if len(path_nodes) > 0:95 path_results = []96 for p in path_nodes:97 vals = self.shape.value_nodes_from_path(sg, focus_node, p, data_graph)98 path_results.extend(vals)99 return path_results100 filter_shapes = set(sg.objects(expr, SH_filterShape))101 nodes_nodes = set(sg.objects(expr, SH_nodes))102 if len(filter_shapes) > 0: # pragma: no cover103 # Note: Theres no tests for this whole filterShapes feature!104 if len(nodes_nodes) > 1:105 warn(Warning("More than one sh:nodes found. Using the first one."))106 elif len(nodes_nodes) < 1:107 raise ReportableRuntimeError("The Node FilterShape {} does not have sh:nodes.".format(expr))108 filter_shape = next(iter(filter_shapes))109 filter_shape = sg.lookup_shape_from_node(filter_shape)110 nodes_expr = next(iter(nodes_nodes))111 to_filter = self.get_nodes_from_node_expression(112 nodes_expr, focus_node, data_graph, recurse_depth=recurse_depth + 1113 )114 passes = set()115 for n in to_filter:116 conforms, reports = filter_shape.validate(data_graph, n)117 if conforms:118 passes.add(n)119 return passes120 # Got to here, the only other possibility is this is a FunctionExpression.121 remain_pairs = set(sg.predicate_objects(expr))122 if len(remain_pairs) > 1:123 warn(Warning("More than one NodeExpression found on the TripleRule. Using the first one."))124 fnexpr, fnargslist = next(iter(remain_pairs))125 # find the function!126 try:127 function, optionals = sg.get_shacl_function(fnexpr)128 except KeyError:129 raise ReportableRuntimeError(130 "The SHACLFunction {} was not defined in this SHACL Shapes file.".format(fnexpr)131 )132 argslist_parts = list(sg.graph.items(fnargslist))133 args_sets = [134 self.get_nodes_from_node_expression(p, focus_node, data_graph, recurse_depth=recurse_depth + 1)135 for p in argslist_parts136 ]137 num_args_sets = len(args_sets)138 num_expected_args = len(optionals)139 if num_args_sets > num_expected_args:140 raise ReportableRuntimeError("Too many arguments given for {}".format(fnexpr))141 if num_args_sets < num_expected_args:142 # not enough, but some might be optional143 num_diff = num_expected_args - num_args_sets144 args_sets_slice = args_sets[0 - num_diff :]145 all_empty = itertools.accumulate(146 {True if len(a) < 1 else False for a in args_sets_slice}, func=operator.or_147 )148 if all_empty:149 args_sets = args_sets[:num_expected_args]150 else:151 raise ReportableRuntimeError(152 "Too few arguments given for {}, with non-optional arguments.".format(fnexpr)153 )154 add_nones = set()155 for i, a in enumerate(args_sets):156 if len(a) < 1:157 if optionals[i] is False:158 warn(Warning("Got an empty set of nodes for a non-optional argument in {}.".format(fnexpr)))159 return []160 else:161 add_nones.add(i)162 for i in add_nones:163 args_sets[i] = [None]164 args_permutations = list(itertools.product(*args_sets))165 responses = set()166 for p in args_permutations:167 result = function(data_graph, *p)168 responses.add(result)169 return responses170 else:171 raise NotImplementedError("Unsupported expression s, p, or o, in SHACL TripleRule")172 def apply(self, data_graph: 'GraphLike') -> int:173 focus_nodes = self.shape.focus_nodes(data_graph) # uses target nodes to find focus nodes174 applicable_nodes = self.filter_conditions(focus_nodes, data_graph)175 all_added = 0176 iterate_limit = 100177 while True:178 if iterate_limit < 1:179 raise ReportableRuntimeError("sh:rule iteration exceeded iteration limit of 100.")180 iterate_limit -= 1181 added = 0182 to_add = []183 for a in applicable_nodes:184 s_set = self.get_nodes_from_node_expression(self.s, a, data_graph)185 p_set = self.get_nodes_from_node_expression(self.p, a, data_graph)186 o_set = self.get_nodes_from_node_expression(self.o, a, data_graph)187 new_triples = itertools.product(s_set, p_set, o_set)188 this_added = False189 for i in iter(new_triples):190 if not this_added and i not in data_graph:191 this_added = True192 to_add.append(i)193 if this_added:194 added += 1195 if added > 0:196 for i in to_add:197 data_graph.add(i)198 all_added += added199 if self.iterate:200 continue # Jump up to iterate201 else:202 break # Don't iterate203 break...
service.py
Source:service.py
1from sys import exit2from src.db import database3from src.definitions.assistant import *4from src.lib.assistant.definitions.command import CommandParsingException5from src.lib.harmonic_mixing.definitions.transition_match_finder import TransitionMatchFinder6from src.utils.harmonic_mixing import *7from src.utils.assistant import *8def parse_user_input(user_input):9 """10 Parses user input and returns command name and arguments to execute.11 :param user_input: Raw text input.12 """13 stripped_input = user_input.strip()14 if is_empty(stripped_input):15 return16 segments = [seg.strip() for seg in stripped_input.split()]17 if stripped_input[0] == '[':18 cmd_alias = MATCH19 segments = [MATCH] + segments20 else:21 cmd_alias = segments[0].lower()22 if cmd_alias not in ALL_ALIASES:23 raise CommandParsingException('%s is not a valid command.' % cmd_alias)24 cmd_name = ALIAS_MAPPING.get(cmd_alias, cmd_alias)25 args = [' '.join(segments[1:])] if cmd_name == MATCH else segments[1:]26 return cmd_name, args27class Assistant:28 """" CLI assistant functions."""29 def __init__(self):30 """ Initialize the assistant. """31 self.session = database.create_session()32 self.transition_match_finder = TransitionMatchFinder(self.session)33 def execute(self, user_input):34 """35 Parses and validates user input and executes corresponding command.36 :param user_input: Raw user input.37 """38 cmd_name, args = parse_user_input(user_input)39 command = COMMANDS[cmd_name]40 num_args = len(args)41 expected_args = command.get_arguments()42 num_expected_args = len([arg for arg in expected_args if arg.required])43 if num_args != num_expected_args:44 formatted_args = '' if num_args == 0 else ' - got: %s' % ' '.join(args)45 raise CommandParsingException('%s expects %d arguments%s.' % (cmd_name, num_expected_args, formatted_args))46 cmd_function = command.get_function()47 cmd_args = {expected_args[i].get_name(): args[i].strip() for i in range(num_args)}48 return getattr(self, cmd_function)(**cmd_args)49 def print_transition_matches(self, track_title):50 """51 Prints transition matches for the given track.52 :param track_title - Formatted track title (with metadata)53 """54 self.transition_match_finder.print_transition_matches(track_title)55 def reload_track_data(self):56 """ Reloads tracks from the audio directory and regenerates Camelot map and metadata. """57 self.transition_match_finder.reload_track_data()58 print('Track data reloaded.')59 def shutdown(self):60 """ Exits the CLI. """61 print('Goodbye.')...
client.py
Source:client.py
1#!/usr/bin/env python32# -*- coding: utf-8 -*-3import sys4import socket5def validate_input(args):6 # Check that we got the right num of args7 num_expected_args = 38 if len(args) != num_expected_args:9 print('Please enter exactly three arguments: ')10 sys.exit(1) # Exit11 # VALIDATE GIVEN INPUT12 bad_input = False13 14 # Validate IP15 try:16 socket.inet_aton(args[0])17 # If we got here, IP is legal18 except socket.error:19 # IP is illegal20 print('Invalid IP address')21 bad_input = True22 # Validate port number23 # Checks if user inputted port number is valid24 if not args[1].isdigit() or int(args[1]) < 1 or int(args[1]) > 65535:25 print("Invalid port number")26 bad_input = True27 28 # Validate file name29 file = None30 try:31 file = open(args[2], 'rb')32 file.close()33 except:34 print('Given file does not exist')35 bad_input = True 36 37 if bad_input:38 print('Try executing the program again. Terminating...')39 sys.exit(1)40 41def read_unit_from_file(file, unit_id):42 # Read 90 bytes from the file43 data = file.read(90)44 # Convert counter to bytes45 id_str = unit_id.to_bytes(10, 'little')46 # Put together package47 return id_str + data48def verify_message(original, received):49 return original == received50if __name__ == "__main__":51 # Get program arguments (file_name, port_num, ip)52 args_arr = sys.argv[1:]53 args_arr.reverse()54 validate_input(args_arr) # Validate input55 # Extract arguments 56 ip = args_arr[0]57 port_num = int(args_arr[1])58 file_name = args_arr[2]59 # Open file and read procedurally60 file = open(file_name, 'rb')61 # Open socket62 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)63 s.settimeout(15)64 counter = 0 # Tracks message ID65 message = read_unit_from_file(file, counter)66 # While we can still extract units of data from the file67 while message == None or len(message) > 10:68 # Keep sending messages until it is received successfully69 data = None70 while data == None or not verify_message(message, data):71 # Trying to Send a message72 try:73 s.sendto(message, (ip, port_num))74 # Get return message75 # we define a new protocol which says:76 # if the message didnt arrive in atmost 500 MS, the message will77 # be sent again78 data, addr = s.recvfrom(1024)79 except socket.timeout:80 continue81 # Increase message ID82 counter += 183 # Read next package from file84 message = read_unit_from_file(file, counter)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!