Best Python code snippet using molecule_python
__init__.py
Source:__init__.py
...37import conveyor.task38from conveyor.decorator import args, command39class _ClientCommand(conveyor.main.Command):40 '''A client command.'''41 def _get_driver_name(self):42 if None is not self._parsed_args.driver_name:43 driver_name = self._parsed_args.driver_name44 else:45 driver_name = self._config.get('client', 'driver')46 return driver_name47 def _get_profile_name(self):48 if None is not self._parsed_args.profile_name:49 profile_name = self._parsed_args.profile_name50 else:51 profile_name = self._config.get('client', 'profile')52 return profile_name53class _JsonRpcCommand(_ClientCommand):54 '''55 A client command that requires a JSON-RPC connection to the conveyor56 service.57 '''58 def __init__(self, parsed_args, config):59 _ClientCommand.__init__(self, parsed_args, config)60 self._jsonrpc = None61 self._stop = False62 self._code = 063 def run(self):64 address = self._config.get('common', 'address')65 try:66 self._connection = address.connect()67 except EnvironmentError as e:68 self._code = 169 self._log.critical(70 'failed to connect to address: %s: %s',71 address, e.strerror, exc_info=True)72 if not self._pid_file_exists():73 self._log.critical(74 'pid file missing; is the conveyor service running?')75 else:76 self._jsonrpc = conveyor.jsonrpc.JsonRpc(77 self._connection, self._connection)78 self._export_methods()79 hello_task = self._jsonrpc.request('hello', {})80 hello_task.stoppedevent.attach(81 self._guard_callback(self._hello_callback))82 hello_task.start()83 self._jsonrpc.run()84 return self._code85 def _pid_file_exists(self):86 pid_file = self._config.get('common', 'pid_file')87 result = os.path.exists(pid_file)88 return result89 def _export_methods(self):90 '''91 Export JSON-RPC methods to the conveyor service. The default92 implementation does not export any methods.93 '''94 def _guard_callback(self, callback):95 '''96 Creates a new callback that invokes `_check_task` and then invokes97 `callback` only if `_check_task` returns `True`. This reduces some98 repetitive code.99 '''100 def guard(task):101 if self._check_task(task):102 def func():103 try:104 callback(task)105 except:106 self._stop_jsonrpc()107 raise108 conveyor.error.guard(self._log, func)109 return guard110 def _check_task(self, task):111 '''112 Returns whether or not a task ended successfully. It terminates the113 client if the task failed or was canceled.114 '''115 if conveyor.task.TaskConclusion.ENDED == task.conclusion:116 result = True117 elif conveyor.task.TaskConclusion.FAILED == task.conclusion:118 self._code = 1119 self._log.error('%s', task.failure)120 self._stop_jsonrpc()121 result = False122 elif conveyor.task.TaskConclusion.CANCELED == task.conclusion:123 self._code = 1124 self._log.warning('canceled')125 self._stop_jsonrpc()126 result = False127 else:128 self._stop_jsonrpc()129 raise ValueError(task.conclusion)130 return result131 def _stop_jsonrpc(self):132 '''Stop the JSON-RPC connection. This will end the client.'''133 self._stop = True134 self._jsonrpc.stop()135 def _hello_callback(self, hello_task):136 '''137 A callback invoked after the command successfully invokes `hello` on138 the conveyor service. This callback can be used to invoke additional139 methods on the conveyor service.140 '''141 raise NotImplementedError142class _MethodCommand(_JsonRpcCommand):143 '''144 A client command that invokes a JSON-RPC request on the conveyor service.145 '''146 def _hello_callback(self, hello_task):147 method_task = self._create_method_task()148 method_task.stoppedevent.attach(149 self._guard_callback(self._method_callback))150 method_task.start()151 def _create_method_task(self):152 '''153 Creates a task for a request to be invoked on the conveyor service.154 '''155 raise NotImplementedError156 def _method_callback(self, method_task):157 '''158 A callback invoked when the request returns. This callback can be used159 to handle the result of the request, to handle errors, and to invoke160 additional methods on the conveyor service.161 '''162 raise NotImplementedError163class _QueryCommand(_MethodCommand):164 '''165 A client command that invokes a JSON-RPC request on the conveyor service166 and handles the result.167 '''168 def _method_callback(self, method_task):169 self._handle_result(method_task.result)170 self._stop_jsonrpc()171 def _handle_result(self, result):172 '''Handles the result of the query.'''173 raise NotImplementedError174@args(conveyor.arg.json)175class _JsonCommand(_QueryCommand):176 '''177 A client command that invokes a JSON-RPC request on the conveyor service178 and optionally prints the result in raw JSON format.179 '''180 def _handle_result(self, result):181 if self._parsed_args.json:182 self._handle_result_json(result)183 else:184 self._handle_result_default(result)185 def _handle_result_json(self, result):186 '''187 Handles the result of the query by printing it in raw JSON format.188 '''189 json.dump(result, sys.stdout)190 print()191 def _handle_result_default(self, result):192 '''193 Handles the result of the query in some way other than printing it in194 raw JSON format.195 '''196 raise NotImplementedError197class _MonitorCommand(_MethodCommand):198 '''199 A client command that invokes a JSON-RPC request on the conveyor service200 and waits for a job to complete. The request must return a job id.201 '''202 def __init__(self, parsed_args, config):203 _MethodCommand.__init__(self, parsed_args, config)204 self._job_id = None205 def _export_methods(self):206 self._jsonrpc.addmethod('jobchanged', self._job_changed)207 def _job_changed(self, *args, **kwargs):208 '''209 Invoked by the conveyor service to inform the client that a job has210 changed.211 '''212 job = conveyor.job.JobInfo.from_dict(kwargs)213 job_id = job.id214 if (not self._stop and None is not self._job_id215 and self._job_id == job_id):216 if conveyor.task.TaskState.STOPPED == job.state:217 if conveyor.task.TaskConclusion.ENDED == job.conclusion:218 self._code = 0219 self._log.info('job ended')220 elif conveyor.task.TaskConclusion.FAILED == job.conclusion:221 self._code = 1222 self._log.error('job failed: %s', job.failure)223 elif conveyor.task.TaskConclusion.CANCELED == job.conclusion:224 self._code = 1225 self._log.warning('job canceled')226 else:227 raise ValueError(job.conclusion)228 self._stop_jsonrpc()229 def _method_callback(self, method_task):230 if (None is not method_task.result231 and isinstance(method_task.result, dict)232 and 'id' in method_task.result):233 self._job_id = method_task.result['id']234 else:235 self._code = 1236 self._log.error(237 'the conveyor service returned invalid job information')238 self._stop_jsonrpc()239@args(conveyor.arg.driver)240@args(conveyor.arg.machine)241@args(conveyor.arg.port)242@args(conveyor.arg.profile)243class _ConnectedCommand(_MonitorCommand):244 '''245 A client command that connects a machine, invokes a JSON-RPC request on the246 conveyor service and waits for a job to complete. The request must return a247 job id.248 This is essentially a `_MonitorCommand` that calls `connect` on the249 conveyor service before invoking the job-related method. `connect` must250 return a `MachineInfo` object with a `name` field. The machine's name is251 stored in an instance field called `_machine_name`.252 '''253 def __init__(self, parsed_args, config):254 _MonitorCommand.__init__(self, parsed_args, config)255 self._machine_name = None256 def _hello_callback(self, hello_task):257 # NOTE: this method doesn't use the `_get_driver_name` nor258 # `_get_profile_name` as the driver and profile can often be detected259 # automatically.260 params = {261 'machine_name': self._parsed_args.machine_name,262 'port_name': self._parsed_args.port_name,263 'driver_name': self._parsed_args.driver_name,264 'profile_name': self._parsed_args.profile_name,265 'persistent': False,266 }267 connect_task = self._jsonrpc.request('connect', params)268 connect_task.stoppedevent.attach(269 self._guard_callback(self._connect_callback))270 connect_task.start()271 def _connect_callback(self, connect_task):272 self._machine_name = connect_task.result['name']273 method_task = self._create_method_task()274 method_task.stoppedevent.attach(275 self._guard_callback(self._method_callback))276 method_task.start()277@args(conveyor.arg.positional_job)278class CancelCommand(_MethodCommand):279 name = 'cancel'280 help = 'cancel a job'281 def _create_method_task(self):282 params = {'id': self._parsed_args.job_id}283 method_task = self._jsonrpc.request('canceljob', params)284 return method_task285 def _method_callback(self, method_task):286 self._stop_jsonrpc()287@args(conveyor.arg.driver)288@args(conveyor.arg.positional_firmware_version)289class CompatibleFirmware(_QueryCommand):290 name = 'compatiblefirmware'291 help = 'determine if a firmware verison is comatible with the MakerBot driver'292 def _create_method_task(self):293 params = {294 'driver_name': self._get_driver_name(),295 'firmware_version': self._parsed_args.firmware_version,296 }297 method_task = self._jsonrpc.request('compatiblefirmware', params)298 return method_task299 def _handle_result(self, result):300 print('Your firmware version is compatible: %r' % (result,))301@args(conveyor.arg.driver)302@args(conveyor.arg.machine)303@args(conveyor.arg.port)304@args(conveyor.arg.profile)305class ConnectCommand(_MethodCommand):306 name = 'connect'307 help = 'connect to a machine'308 def _create_method_task(self):309 params = {310 'machine_name': self._parsed_args.machine_name,311 'port_name': self._parsed_args.port_name,312 'driver_name': self._get_driver_name(),313 'profile_name': self._get_profile_name(),314 'persistent': True,315 }316 method_task = self._jsonrpc.request('connect', params)317 return method_task318 def _method_callback(self, method_task):319 self._stop_jsonrpc()320@args(conveyor.arg.positional_output_file_optional)321class DefaultConfigCommand(_ClientCommand):322 name = 'defaultconfig'323 help = 'print the platform\'s default conveyor configuration'324 def run(self):325 if None is self._parsed_args.output_file:326 conveyor.config.format_default(sys.stdout)327 else:328 with open(self._parsed_args.output_file, 'w') as fp:329 conveyor.config.format_default(fp)330 return 0331class DirCommand(_JsonCommand):332 name = 'dir'333 help = 'list the methods available from the conveyor service'334 def _create_method_task(self):335 params = {}336 method_task = self._jsonrpc.request('dir', params)337 return method_task338 def _handle_result_default(self, result):339 for method_name, description in result.items():340 lines = textwrap.dedent(description).splitlines()341 def is_blank(s):342 return 0 == len(s) or s.isspace()343 # Remove blank lines at the end of the description. This puts the344 # lines in reverse order.345 lines = list(itertools.dropwhile(is_blank, reversed(lines)))346 # Remove blank lines at the start of the description. This also has347 # the side-effect of putting the lines back in forward order.348 lines = list(itertools.dropwhile(is_blank, reversed(lines)))349 self._log.info('%s:', method_name)350 for line in lines:351 self._log.info(' %s', line)352@args(conveyor.arg.machine)353class DisconnectCommand(_MethodCommand):354 name = 'disconnect'355 help = 'disconnect from a machine'356 def _create_method_task(self):357 params = {358 'machine_name': self._parsed_args.machine_name,359 }360 method_task = self._jsonrpc.request('disconnect', params)361 return method_task362 def _method_callback(self, method_task):363 self._stop_jsonrpc()364@args(conveyor.arg.driver)365@args(conveyor.arg.machine_type)366@args(conveyor.arg.firmware_version)367class DownloadFirmware(_QueryCommand):368 name = 'downloadfirmware'369 help = 'download firmware'370 def _create_method_task(self):371 params = {372 'driver_name': self._get_driver_name(),373 'machine_type': self._parsed_args.machine_type,374 'firmware_version': self._parsed_args.firmware_version,375 }376 method_task = self._jsonrpc.request('downloadfirmware', params)377 return method_task378 def _handle_result(self, result):379 self._log.info('firmware downloaded to: %s', result)380@args(conveyor.arg.positional_driver)381class DriverCommand(_JsonCommand):382 name = 'driver'383 help = 'get the details for a driver'384 def _create_method_task(self):385 params = {'driver_name': self._get_driver_name(),}386 method_task = self._jsonrpc.request('get_driver', params)387 return method_task388 def _handle_result_default(self, result):389 driver = result390 drivers = [driver]391 _print_driver_profiles(self._log, drivers)392class DriversCommand(_JsonCommand):393 name = 'drivers'394 help = 'list the available drivers'395 def _create_method_task(self):396 params = {}397 method_task = self._jsonrpc.request('get_drivers', params)398 return method_task399 def _handle_result_default(self, result):400 drivers = result401 _print_driver_profiles(self._log, drivers)402@args(conveyor.arg.driver)403@args(conveyor.arg.machine_type)404class GetMachineVersions(_QueryCommand):405 name = 'getmachineversions'406 help = 'get the firmware versions available for a machine'407 def _create_method_task(self):408 params = {409 'driver_name': self._get_driver_name(),410 'machine_type': self._parsed_args.machine_type,411 }412 method_task = self._jsonrpc.request('getmachineversions', params)413 return method_task414 def _handle_result(self, result):415 self._log.info('%s', result)416@args(conveyor.arg.driver)417class GetUploadableMachines(_QueryCommand):418 name = 'getuploadablemachines'419 help = 'list the machines to which conveyor can upload firmware'420 def _create_method_task(self):421 params = {'driver_name': self._get_driver_name(),}422 method_task = self._jsonrpc.request('getuploadablemachines', params)423 return method_task424 def _handle_result(self, result):425 print(result)426@args(conveyor.arg.positional_job)427class JobCommand(_JsonCommand):428 name = 'job'429 help = 'get the details for a job'430 def _create_method_task(self):431 params = {'id': int(self._parsed_args.job_id)}432 method_task = self._jsonrpc.request('getjob', params)433 return method_task434 def _handle_result_default(self, result):435 self._log.info('%s', result)436class JobsCommand(_JsonCommand):437 name = 'jobs'438 help = 'get the details for all jobs'439 def _create_method_task(self):440 params = {}441 method_task = self._jsonrpc.request('getjobs', params)442 return method_task443 def _handle_result_default(self, result):444 self._log.info('%s', result)445class PauseCommand(_ConnectedCommand):446 name = 'pause'447 help = 'pause a machine'448 def _create_method_task(self):449 params = {450 'machine_name': self._parsed_args.machine_name,451 'port_name': self._parsed_args.port_name,452 'driver_name': self._get_driver_name(),453 'profile_name': self._get_profile_name(),454 }455 pause_task = self._jsonrpc.request('pause', params)456 return pause_task457class PortsCommand(_JsonCommand):458 name = 'ports'459 help = 'list the available ports'460 def _create_method_task(self):461 params = {}462 method_task = self._jsonrpc.request('getports', params)463 return method_task464 def _handle_result_default(self, result):465 for port in result:466 if conveyor.machine.port.PortType.SERIAL == port['type']:467 self._handle_serial(port)468 else:469 raise ValueError(port['type'])470 def _handle_serial(self, port):471 self._log.info('Serial port:')472 self._log.info(' name - %s', port['name'])473 self._log.info(' path - %s', port['path'])474 self._log.info(' iSerial - %s', port['iserial'])475 self._log.info(' VID:PID - %04X:%04X', port['vid'], port['pid'])476@args(conveyor.arg.extruder)477@args(conveyor.arg.gcode_processor)478@args(conveyor.arg.has_start_end)479@args(conveyor.arg.material)480@args(conveyor.arg.slicer)481@args(conveyor.arg.slicer_settings)482@args(conveyor.arg.positional_input_file)483class PrintCommand(_ConnectedCommand):484 name = 'print'485 help = 'print an object'486 def _create_method_task(self):487 slicer_settings = _create_slicer_settings(488 self._parsed_args, self._config)489 slicer_settings.path = self._parsed_args.slicer_settings_path490 extruder_name = _fix_extruder_name(self._parsed_args.extruder_name)491 params = {492 'machine_name': self._machine_name,493 'input_file': os.path.abspath(self._parsed_args.input_file),494 'extruder_name': extruder_name,495 'gcode_processor_names': self._parsed_args.gcode_processor_names,496 'has_start_end': self._parsed_args.has_start_end,497 'material_name': self._parsed_args.material_name,498 'slicer_name': self._parsed_args.slicer_name,499 'slicer_settings': slicer_settings.to_dict(),500 }501 method_task = self._jsonrpc.request('print', params)502 return method_task503@args(conveyor.arg.driver)504@args(conveyor.arg.extruder)505@args(conveyor.arg.gcode_processor)506@args(conveyor.arg.file_type)507@args(conveyor.arg.has_start_end)508@args(conveyor.arg.material)509@args(conveyor.arg.profile)510@args(conveyor.arg.slicer)511@args(conveyor.arg.slicer_settings)512@args(conveyor.arg.positional_input_file)513@args(conveyor.arg.positional_output_file)514class PrintToFileCommand(_MonitorCommand):515 name = 'printtofile'516 help = 'print an object to an .s3g or .x3g file'517 def _create_method_task(self):518 slicer_settings = _create_slicer_settings(519 self._parsed_args, self._config)520 slicer_settings.path = self._parsed_args.slicer_settings_path521 extruder_name = _fix_extruder_name(self._parsed_args.extruder_name)522 params = {523 'driver_name': self._get_driver_name(),524 'profile_name': self._get_profile_name(),525 'input_file': os.path.abspath(self._parsed_args.input_file),526 'output_file': os.path.abspath(self._parsed_args.output_file),527 'extruder_name': extruder_name,528 'file_type': self._parsed_args.file_type,529 'gcode_processor_names': self._parsed_args.gcode_processor_names,530 'has_start_end': self._parsed_args.has_start_end,531 'material_name': self._parsed_args.material_name,532 'slicer_name': self._parsed_args.slicer_name,533 'slicer_settings': slicer_settings.to_dict(),534 }535 method_task = self._jsonrpc.request('print_to_file', params)536 return method_task537class PrintersCommand(_JsonCommand):538 name = 'printers'539 help = 'list connected printers'540 def _create_method_task(self):541 params = {}542 method_task = self._jsonrpc.request('getprinters', params)543 return method_task544 def _handle_result_default(self, result):545 for machine in result:546 self._log.info('Printer:')547 self._log.info(' name - %s', machine['name'])548 self._log.info(' state - %s', machine['state'])549 self._log.info(' temperature - %s', machine['temperature'])550 self._log.info(' firmware - %s', machine['firmware_version'])551 # TODO: stop being lazy and add the rest of the fields.552@args(conveyor.arg.positional_driver)553@args(conveyor.arg.positional_profile)554class ProfileCommand(_JsonCommand):555 name = 'profile'556 help = 'get the details for a profile'557 def _create_method_task(self):558 params = {559 'driver_name': self._get_driver_name(),560 'profile_name': self._get_profile_name(),561 }562 method_task = self._jsonrpc.request('get_profile', params)563 return method_task564 def _handle_result_default(self, result):565 profile = result566 profiles = [profile]567 driver = {568 'name': self._parsed_args.driver_name,569 'profiles': profiles,570 }571 drivers = [driver]572 _print_driver_profiles(self._log, drivers)573@args(conveyor.arg.positional_driver)574class ProfilesCommand(_JsonCommand):575 name = 'profiles'576 help = 'list the available profiles'577 def _create_method_task(self):578 params = {'driver_name': self._get_driver_name(),}579 method_task = self._jsonrpc.request('get_profiles', params)580 return method_task581 def _handle_result_default(self, result):582 profiles = result583 driver = {584 'name': self._parsed_args.driver_name,585 'profiles': profiles,586 }587 drivers = [driver]588 _print_driver_profiles(self._log, drivers)589@args(conveyor.arg.machine)590@args(conveyor.arg.positional_output_file)591class ReadEepromCommand(_QueryCommand):592 name = 'readeeprom'593 help = 'read a machine EEPROM'594 def _create_method_task(self):595 params = {'printername': self._parsed_args.machine_name}596 method_task = self._jsonrpc.request('readeeprom', params)597 return method_task598 def _handle_result(self, result):599 output_file = os.path.abspath(self._parsed_args.output_file)600 with open(output_file, 'w') as fp:601 json.dump(result, fp, sort_keys=True, indent=2)602class ResetToFactoryCommand(_QueryCommand):603 name = 'resettofactory'604 help = 'reset a machine EEPROM to factory settings'605 def _create_method_task(self):606 params = {'printername': None}607 method_task = self._jsonrpc.request('resettofactory', params)608 return method_task609 def _handle_result(self, result):610 pass611@args(conveyor.arg.add_start_end)612@args(conveyor.arg.driver)613@args(conveyor.arg.extruder)614@args(conveyor.arg.gcode_processor)615@args(conveyor.arg.material)616@args(conveyor.arg.profile)617@args(conveyor.arg.slicer)618@args(conveyor.arg.slicer_settings)619@args(conveyor.arg.positional_input_file)620@args(conveyor.arg.positional_output_file)621class SliceCommand(_MonitorCommand):622 name = 'slice'623 help = 'slice an object to a .gcode file'624 def _create_method_task(self):625 slicer_settings = _create_slicer_settings(626 self._parsed_args, self._config)627 slicer_settings.path = self._parsed_args.slicer_settings_path628 extruder_name = _fix_extruder_name(self._parsed_args.extruder_name)629 params = {630 'driver_name': self._get_driver_name(),631 'profile_name': self._get_profile_name(),632 'input_file': os.path.abspath(self._parsed_args.input_file),633 'output_file': os.path.abspath(self._parsed_args.output_file),634 'add_start_end': self._parsed_args.add_start_end,635 'extruder_name': extruder_name,636 'gcode_processor_names': self._parsed_args.gcode_processor_names,637 'material_name': self._parsed_args.material_name,638 'slicer_name': self._parsed_args.slicer_name,639 'slicer_settings': slicer_settings.to_dict(),640 }641 method_task = self._jsonrpc.request('slice', params)642 return method_task643class UnpauseCommand(_ConnectedCommand):644 name = 'unpause'645 help = 'unpause a machine'646 def _create_method_task(self):647 params = {648 'machine_name': self._parsed_args.machine_name,649 'port_name': self._parsed_args.port_name,650 'driver_name': self._get_driver_name(),651 'profile_name': self._get_profile_name(),652 }653 pause_task = self._jsonrpc.request('unpause', params)654 return pause_task655@args(conveyor.arg.machine_type)656@args(conveyor.arg.positional_input_file)657class UploadFirmwareCommand(_QueryCommand):658 name = 'uploadfirmware'659 help = 'upload firmware'660 def _create_method_task(self):661 params = {662 'machine_name': None,663 'machinetype': self._parsed_args.machine_type,664 'filename': os.path.abspath(self._parsed_args.input_file),...
database.py
Source:database.py
...11 password: str = None,12 hostname: str = 'localhost',13 port: int = 3306):14 db_pass = f':{password}' if password is not None else ''15 db_driver = self._get_driver_name(dialect)16 self.engine = create_engine(17 f'{db_driver}://{username}{db_pass}@{hostname}:{port}/{database}')18 session_factory.configure(bind=self.engine)19 @classmethod20 def can_handle(cls, context_type) -> bool:21 return context_type == 'mariadb' or context_type == 'mysql'22 @contextmanager23 def get_session(self):24 """Returns a Session factory object for connecting to the database"""25 try:26 session: Session = session_factory()27 yield session28 session.commit()29 except:30 session.rollback()31 raise32 finally:33 session.close()34 @classmethod35 def _get_driver_name(cls, context_type) -> str:36 # This allows support for either mysql or mariadb using the same class.37 if context_type == 'mysql':38 return 'mysql'39 if context_type == 'mariadb':40 return 'mariadb+mariadbconnector'41 ...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!