Best Python code snippet using autotest_python
test_rule_based_profiler.py
Source:test_rule_based_profiler.py
...1193 ge_cloud_id=None,1194 )1195 assert "Non-existent Profiler" in str(e.value)1196@pytest.mark.unit1197def test_delete_profiler(1198 populated_profiler_store: ProfilerStore,1199):1200 with mock.patch(1201 "great_expectations.data_context.store.profiler_store.ProfilerStore.remove_key",1202 ) as mock_remove_key:1203 RuleBasedProfiler.delete_profiler(1204 profiler_store=populated_profiler_store,1205 name="my_profiler",1206 ge_cloud_id=None,1207 )1208 assert mock_remove_key.call_count == 11209 assert mock_remove_key.call_args == mock.call(1210 key=ConfigurationIdentifier("my_profiler")1211 )1212@pytest.mark.unit1213def test_delete_profiler_with_too_many_args_raises_error(1214 populated_profiler_store: ProfilerStore,1215):1216 with pytest.raises(AssertionError) as e:1217 RuleBasedProfiler.delete_profiler(1218 profiler_store=populated_profiler_store,1219 name="my_profiler",1220 ge_cloud_id="my_ge_cloud_id",1221 )1222 assert "either name or ge_cloud_id" in str(e.value)1223@pytest.mark.unit1224def test_delete_profiler_non_existent_profiler_raises_error(1225 populated_profiler_store: ProfilerStore,1226):1227 with pytest.raises(ge_exceptions.ProfilerNotFoundError) as e:1228 RuleBasedProfiler.delete_profiler(1229 profiler_store=populated_profiler_store,1230 name="my_non_existent_profiler",1231 ge_cloud_id=None,1232 )1233 assert "Non-existent Profiler" in str(e.value)1234@mock.patch("great_expectations.data_context.store.ProfilerStore")1235@pytest.mark.unit1236def test_list_profilers(mock_profiler_store: mock.MagicMock):1237 store = mock_profiler_store()1238 keys = ["a", "b", "c"]1239 store.list_keys.return_value = [ConfigurationIdentifier(char) for char in keys]1240 res = RuleBasedProfiler.list_profilers(store, ge_cloud_mode=False)1241 assert res == keys1242 assert store.list_keys.called...
workers.py
Source:workers.py
1import gevent.monkey2gevent.monkey.patch_all()3import os4import random5import requests6from s3_stress.utils import profilers, connectors, helpers, consts7from s3_stress.utils import server_logger8logger = server_logger.Logger(name=__name__).logger9@profilers.upload_profiler10def s3_put(session, url, **kwargs):11 return session.put(url, data=kwargs['data'], auth=kwargs['auth'], headers=kwargs['headers'])12@connectors.s3_connector13def s3_put_worker(**kwargs):14 bucket_name = kwargs['bucket_name']15 path = kwargs['path']16 metadata = helpers.generate_metadata_dict() if kwargs['metadata'] else {}17 random_chunk_size = random.randint(kwargs['min_size'], kwargs['max_size'])18 # in case user selected MIXED size, we use min/max size arguments to calculate size of random data chunk19 data_chunk = consts.DATA_BUFFERS["20M"][:random_chunk_size] if kwargs['data_chunk_size'] == 'MIXED' \20 else consts.DATA_BUFFERS[kwargs['data_chunk_size']]21 logger.info(f"PUT Worker started: URL: {kwargs['endpoint_url']} Bucket: {kwargs['bucket_name']} "22 f"Path: {kwargs['path']} Thread ID: {kwargs['thread_id']} Data Chunk Size: {kwargs['data_chunk_size']}")23 path = os.path.join(path, kwargs['thread_id'])24 path_counter = 10025 session = requests.Session()26 while not kwargs['stop_event'].is_set():27 file_counter = 028 while file_counter < 5000 and not kwargs['stop_event'].is_set():29 object_name = str(file_counter)30 full_object_name = "/".join([path, str(path_counter), object_name])31 try:32 url = "/".join([kwargs['endpoint_url'], bucket_name, full_object_name])33 res = s3_put(session, url, data=data_chunk, auth=kwargs['auth'], headers=metadata,34 counter=kwargs['stats_collector'].total_time_spent_in_upload)35 res.raise_for_status()36 with kwargs['stats_collector'].total_uploaded_files.get_lock():37 kwargs['stats_collector'].total_uploaded_files.value += 138 except (requests.ConnectionError, requests.HTTPError) as requests_err:39 logger.error("{} : PUT {}".format(requests_err.args[0], full_object_name))40 if kwargs['stone']:41 kwargs['results_queue'].put(requests_err)42 kwargs['stop_event'].set()43 raise44 except requests.Timeout as timeout:45 logger.error("PUT request {} Timed out. {}".format(full_object_name, timeout.strerror))46 if kwargs['stone']:47 kwargs['results_queue'].put(timeout)48 kwargs['stop_event'].set()49 raise50 except KeyboardInterrupt:51 kwargs['stop_event'].set()52 file_counter = file_counter + 153 path_counter = path_counter + 154 logger.debug("Worker stopped")55@profilers.download_profiler56def s3_get(session, url, **kwargs):57 return session.get(url, auth=kwargs['auth'])58@connectors.s3_connector59def s3_get_worker(**kwargs):60 bucket_name = kwargs['bucket_name']61 path = kwargs['path']62 path = os.path.join(path, kwargs['thread_id'])63 logger.info(f"GET Worker started: URL: {kwargs['endpoint_url']} Bucket: {kwargs['bucket_name']} "64 f"Path: {kwargs['path']} Thread ID: {kwargs['thread_id']} Data Chunk Size: {kwargs['data_chunk_size']}")65 path_counter = 10066 session = requests.Session()67 while not kwargs['stop_event'].is_set():68 file_counter = 069 while file_counter < 5000 and not kwargs['stop_event'].is_set():70 object_name = str(file_counter)71 full_object_name = "/".join([path, str(path_counter), object_name])72 try:73 url = "/".join([kwargs['endpoint_url'], bucket_name, full_object_name])74 res = s3_get(session, url, auth=kwargs['auth'],75 counter=kwargs['stats_collector'].total_time_spent_in_download)76 res.raise_for_status()77 with kwargs['stats_collector'].total_downloaded_files.get_lock():78 kwargs['stats_collector'].total_downloaded_files.value += 179 except requests.HTTPError as http_err:80 if http_err.errno == 404:81 logger.info("Can't find objects to read, work done...")82 break83 else:84 logger.error("{} : PUT {}".format(http_err.args[0], full_object_name))85 if kwargs['stone']:86 kwargs['results_queue'].put(http_err)87 kwargs['stop_event'].set()88 except requests.ConnectionError as con_err:89 logger.error("{} : GET {}".format(con_err.strerror, full_object_name))90 if kwargs['stone']:91 kwargs['results_queue'].put(con_err)92 kwargs['stop_event'].set()93 raise94 except requests.Timeout as timeout:95 logger.error("GET request {} Timed out. {}".format(full_object_name, timeout.strerror))96 if kwargs['stone']:97 kwargs['results_queue'].put(timeout)98 kwargs['stop_event'].set()99 raise100 except KeyboardInterrupt:101 kwargs['stop_event'].set()102 file_counter = file_counter + 1103 path_counter = path_counter + 1104 logger.debug("Worker stopped")105@profilers.delete_profiler106def s3_delete(session, url, **kwargs):107 return session.delete(url, auth=kwargs['auth'])108@connectors.s3_connector109def s3_delete_worker(**kwargs):110 bucket_name = kwargs['bucket_name']111 path = kwargs['path']112 logger.info(f"DELETE Worker started: URL: {kwargs['endpoint_url']} Bucket: {kwargs['bucket_name']} "113 f"Path: {kwargs['path']} Thread ID: {kwargs['thread_id']} Data Chunk Size: {kwargs['data_chunk_size']}")114 path = os.path.join(path, kwargs['thread_id'])115 session = requests.Session()116 path_counter = 100117 while not kwargs['stop_event'].is_set():118 file_counter = 0119 while file_counter < 5000 and not kwargs['stop_event'].is_set():120 object_name = str(file_counter)121 full_object_name = os.path.join(path, str(path_counter), object_name)122 try:123 url = "/".join([kwargs['endpoint_url'], bucket_name, full_object_name])124 res = s3_delete(session, url, auth=kwargs['auth'],125 counter=kwargs['stats_collector'].total_time_spent_in_deletion)126 res.raise_for_status()127 with kwargs['stats_collector'].total_deleted_files.get_lock():128 kwargs['stats_collector'].total_deleted_files.value += 1129 except (requests.ConnectionError, requests.HTTPError) as requests_err:130 logger.error("{} : DELETE {}".format(requests_err, full_object_name))131 if kwargs['stone']:132 kwargs['results_queue'].put(requests_err)133 kwargs['stop_event'].set()134 raise135 except requests.Timeout as timeout:136 logger.error("DELETE request {} Timed out. {}".format(full_object_name, timeout.strerror))137 if kwargs['stone']:138 kwargs['results_queue'].put(timeout)139 kwargs['stop_event'].set()140 raise141 except KeyboardInterrupt:142 kwargs['stop_event'].set()143 file_counter = file_counter + 1144 path_counter = path_counter + 1...
profilers.py
Source:profilers.py
...26 with total_time_spent_in_download.get_lock():27 total_time_spent_in_download.value += (end - start)28 return res29 return wrapper30def delete_profiler(f):31 """32 Profiler decorator to measure duration of S3 DELETE method33 """34 def wrapper(session, url, **kwargs):35 total_time_spent_in_deletion = kwargs['counter']36 start = timer()37 res = f(session, url, **kwargs)38 end = timer()39 with total_time_spent_in_deletion.get_lock():40 total_time_spent_in_deletion.value += (end - start)41 return res...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!