Best Python code snippet using localstack_python
s3_listener.py
Source:s3_listener.py
...139 for allowed in allowed_origins:140 if origin in allowed or re.match(allowed.replace('*', '.*'), origin):141 response.headers['Access-Control-Allow-Origin'] = origin142 break143def strip_chunk_signatures(data):144 # For clients that use streaming v4 authentication, the request contains chunk signatures145 # in the HTTP body (see example below) which we need to strip as moto cannot handle them146 #147 # 17;chunk-signature=6e162122ec4962bea0b18bc624025e6ae4e9322bdc632762d909e87793ac5921148 # <payload data ...>149 # 0;chunk-signature=927ab45acd82fc90a3c210ca7314d59fedc77ce0c914d79095f8cc9563cf2c70150 data_new = re.sub(r'^[0-9a-fA-F]+;chunk-signature=[0-9a-f]{64}', '', data, flags=re.MULTILINE)151 if data_new != data:152 # trim \r (13) or \n (10)153 for i in range(0, 2):154 if ord(data_new[0]) in (10, 13):155 data_new = data_new[1:]156 for i in range(0, 6):157 if ord(data_new[-1]) in (10, 13):158 data_new = data_new[:-1]159 return data_new160def update_s3(method, path, data, headers, response=None, return_forward_info=False):161 if return_forward_info:162 modified_data = None163 # If this request contains streaming v4 authentication signatures, strip them from the message164 # Related isse: https://github.com/atlassian/localstack/issues/98165 # TODO we should evaluate whether to replace moto s3 with scality/S3:166 # https://github.com/scality/S3/issues/237167 if headers.get('x-amz-content-sha256') == 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD':168 modified_data = strip_chunk_signatures(data)169 # persist this API call to disk170 persistence.record('s3', method, path, data, headers)171 parsed = urlparse.urlparse(path)172 query = parsed.query173 path = parsed.path174 bucket = path.split('/')[1]175 query_map = urlparse.parse_qs(query)176 if method == 'PUT' and (query == 'notification' or 'notification' in query_map):177 tree = ET.fromstring(data)178 for dest in ['Queue', 'Topic', 'CloudFunction']:179 config = tree.find('{%s}%sConfiguration' % (XMLNS_S3, dest))180 if config is not None and len(config):181 S3_NOTIFICATIONS[bucket] = {182 'Id': get_xml_text(config, 'Id'),...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!