Best Python code snippet using localstack_python
fetch.py
Source:fetch.py
1# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>2#3# This file is part of Ansible4#5# Ansible is free software: you can redistribute it and/or modify6# it under the terms of the GNU General Public License as published by7# the Free Software Foundation, either version 3 of the License, or8# (at your option) any later version.9#10# Ansible is distributed in the hope that it will be useful,11# but WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13# GNU General Public License for more details.14#15# You should have received a copy of the GNU General Public License16# along with Ansible. If not, see <http://www.gnu.org/licenses/>.17import os18import pwd19import random20import traceback21import tempfile22import base6423import ansible.constants as C24from ansible import utils25from ansible import errors26from ansible import module_common27from ansible.runner.return_data import ReturnData28class ActionModule(object):29 def __init__(self, runner):30 self.runner = runner31 def run(self, conn, tmp, module_name, module_args, inject, complex_args=None, **kwargs):32 ''' handler for fetch operations '''33 if self.runner.noop_on_check(inject):34 return ReturnData(conn=conn, comm_ok=True, result=dict(skipped=True, msg='check mode not (yet) supported for this module'))35 # load up options36 options = {}37 if complex_args:38 options.update(complex_args)39 options.update(utils.parse_kv(module_args))40 source = options.get('src', None)41 dest = options.get('dest', None)42 flat = options.get('flat', False)43 flat = utils.boolean(flat)44 fail_on_missing = options.get('fail_on_missing', False)45 fail_on_missing = utils.boolean(fail_on_missing)46 validate_checksum = options.get('validate_checksum', None)47 if validate_checksum is not None:48 validate_checksum = utils.boolean(validate_checksum)49 # Alias for validate_checksum (old way of specifying it)50 validate_md5 = options.get('validate_md5', None)51 if validate_md5 is not None:52 validate_md5 = utils.boolean(validate_md5)53 if validate_md5 is None and validate_checksum is None:54 # Default55 validate_checksum = True56 elif validate_checksum is None:57 validate_checksum = validate_md558 elif validate_md5 is not None and validate_checksum is not None:59 results = dict(failed=True, msg="validate_checksum and validate_md5 cannot both be specified")60 return ReturnData(conn, result=results)61 if source is None or dest is None:62 results = dict(failed=True, msg="src and dest are required")63 return ReturnData(conn=conn, result=results)64 source = conn.shell.join_path(source)65 source = self.runner._remote_expand_user(conn, source, tmp)66 # calculate checksum for the remote file67 remote_checksum = self.runner._remote_checksum(conn, tmp, source, inject)68 # use slurp if sudo and permissions are lacking69 remote_data = None70 if remote_checksum in ('1', '2') or self.runner.become:71 slurpres = self.runner._execute_module(conn, tmp, 'slurp', 'src=%s' % source, inject=inject)72 if slurpres.is_successful():73 if slurpres.result['encoding'] == 'base64':74 remote_data = base64.b64decode(slurpres.result['content'])75 if remote_data is not None:76 remote_checksum = utils.checksum_s(remote_data)77 # the source path may have been expanded on the78 # target system, so we compare it here and use the79 # expanded version if it's different80 remote_source = slurpres.result.get('source')81 if remote_source and remote_source != source:82 source = remote_source83 # calculate the destination name84 if os.path.sep not in conn.shell.join_path('a', ''):85 source_local = source.replace('\\', '/')86 else:87 source_local = source88 dest = os.path.expanduser(dest)89 if flat:90 if dest.endswith("/"):91 # if the path ends with "/", we'll use the source filename as the92 # destination filename93 base = os.path.basename(source_local)94 dest = os.path.join(dest, base)95 if not dest.startswith("/"):96 # if dest does not start with "/", we'll assume a relative path97 dest = utils.path_dwim(self.runner.basedir, dest)98 else:99 # files are saved in dest dir, with a subdir for each host, then the filename100 dest = "%s/%s/%s" % (utils.path_dwim(self.runner.basedir, dest), inject['inventory_hostname'], source_local)101 dest = dest.replace("//","/")102 if remote_checksum in ('0', '1', '2', '3', '4'):103 # these don't fail because you may want to transfer a log file that possibly MAY exist104 # but keep going to fetch other log files105 if remote_checksum == '0':106 result = dict(msg="unable to calculate the checksum of the remote file", file=source, changed=False)107 elif remote_checksum == '1':108 if fail_on_missing:109 result = dict(failed=True, msg="the remote file does not exist", file=source)110 else:111 result = dict(msg="the remote file does not exist, not transferring, ignored", file=source, changed=False)112 elif remote_checksum == '2':113 result = dict(msg="no read permission on remote file, not transferring, ignored", file=source, changed=False)114 elif remote_checksum == '3':115 result = dict(msg="remote file is a directory, fetch cannot work on directories", file=source, changed=False)116 elif remote_checksum == '4':117 result = dict(msg="python isn't present on the system. Unable to compute checksum", file=source, changed=False)118 return ReturnData(conn=conn, result=result)119 # calculate checksum for the local file120 local_checksum = utils.checksum(dest)121 if remote_checksum != local_checksum:122 # create the containing directories, if needed123 if not os.path.isdir(os.path.dirname(dest)):124 os.makedirs(os.path.dirname(dest))125 # fetch the file and check for changes126 if remote_data is None:127 conn.fetch_file(source, dest)128 else:129 f = open(dest, 'w')130 f.write(remote_data)131 f.close()132 new_checksum = utils.secure_hash(dest)133 # For backwards compatibility. We'll return None on FIPS enabled134 # systems135 try:136 new_md5 = utils.md5(dest)137 except ValueError:138 new_md5 = None139 if validate_checksum and new_checksum != remote_checksum:140 result = dict(failed=True, md5sum=new_md5, msg="checksum mismatch", file=source, dest=dest, remote_md5sum=None, checksum=new_checksum, remote_checksum=remote_checksum)141 return ReturnData(conn=conn, result=result)142 result = dict(changed=True, md5sum=new_md5, dest=dest, remote_md5sum=None, checksum=new_checksum, remote_checksum=remote_checksum)143 return ReturnData(conn=conn, result=result)144 else:145 # For backwards compatibility. We'll return None on FIPS enabled146 # systems147 try:148 local_md5 = utils.md5(dest)149 except ValueError:150 local_md5 = None151 result = dict(changed=False, md5sum=local_md5, file=source, dest=dest, checksum=local_checksum)...
framevalidator.py
Source:framevalidator.py
...33 if not validate_checksum: 34 logging.debug("Checksum validation is off; IMU data is not validated")35 else:36 logging.debug("Checksum validation is on; IMU data is validated")37 def set_validate_checksum(self, validate_checksum):38 """39 Small helper method to switch checksum validation on/off40 41 Arguments42 ---------43 validate_checksum -- boolean argument to set if checksum validation occurs44 """45 if not isinstance(validate_checksum, bool):46 raise ValueError("validate_checksum flag must be a boolean")47 self.validate_checksum = validate_checksum48 logging.debug("Checksum validation is now set to:{}".format(str(validate_checksum)))49 def validate(self, frame, chunk_id, byte_loc, run_id):50 """51 Method to validate is a provided frame from the camera stream is a...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!