Best Python code snippet using locust
stats.py
Source:stats.py
1# encoding: utf-82"""3The MIT License4Copyright (c) 2009-2010, Carl Byström, Jonatan Heyman5Permission is hereby granted, free of charge, to any person obtaining a copy6of this software and associated documentation files (the "Software"), to deal7in the Software without restriction, including without limitation the rights8to use, copy, modify, merge, publish, distribute, sublicense, and/or sell9copies of the Software, and to permit persons to whom the Software is10furnished to do so, subject to the following conditions:11The above copyright notice and this permission notice shall be included in12all copies or substantial portions of the Software.13THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19THE SOFTWARE.20"""21import time22import datetime23import gevent24import hashlib25import events26STATS_NAME_WIDTH = 6027class RequestStatsAdditionError(Exception):28 pass29class RequestStats(object):30 def __init__(self):31 self.entries = {}32 self.timings = {}33 self.errors = {}34 self.num_requests = 035 self.num_failures = 036 self.max_requests = None37 self.last_request_timestamp = None38 self.start_time = None39 def get_timings(self, name):40 timings = self.timings.get(name)41 if not timings:42 timings = TimingsEntry(name)43 self.timings[name] = timings44 return timings45 def get(self, name, method):46 """47 Retrieve a StatsEntry instance by name and method48 """49 entry = self.entries.get((name, method))50 if not entry:51 entry = StatsEntry(self, name, method)52 self.entries[(name, method)] = entry53 return entry54 def aggregated_stats(self, name="Total", full_request_history=False):55 """56 Returns a StatsEntry which is an aggregate of all stats entries57 within entries.58 """59 total = StatsEntry(self, name, method=None)60 for r in self.entries.itervalues():61 total.extend(r, full_request_history=full_request_history)62 return total63 def reset_all(self):64 """65 Go through all stats entries and reset them to zero66 """67 self.start_time = time.time()68 self.num_requests = 069 self.num_failures = 070 for r in self.entries.itervalues():71 r.reset()72 self.timings = {}73 def clear_all(self):74 """75 Remove all stats entries and errors76 """77 self.num_requests = 078 self.num_failures = 079 self.entries = {}80 self.timings = {}81 self.errors = {}82 self.max_requests = None83 self.last_request_timestamp = None84 self.start_time = None85class TimingsEntry(object):86 name = None87 max_response_times = {}88 current_max = 089 def __init__(self, name):90 self.name = name91 def reset(self):92 self.max_response_times = {}93 self.current_max = 094 return95 def log(self, timestamp, new_max_response_time):96 """97 check if there is an existing max, if so take the max and save it.98 """99 max_record = self.max_response_times.get(timestamp)100 if max_record:101 self.max_response_times[timestamp] = max(max_record, new_max_response_time)102 else:103 self.max_response_times[timestamp] = new_max_response_time104 self.current_max = new_max_response_time105 return106class StatsEntry(object):107 """108 Represents a single stats entry (name and method)109 """110 name = None111 """ Name (URL) of this stats entry """112 method = None113 """ Method (GET, POST, PUT, etc.) """114 num_requests = None115 """ The number of requests made """116 num_failures = None117 """ Number of failed request """118 total_response_time = None119 """ Total sum of the response times """120 min_response_time = None121 """ Minimum response time """122 max_response_time = None123 """ Maximum response time """124 num_reqs_per_sec = None125 """ A {second => request_count} dict that holds the number of requests made per second """126 response_times = None127 """128 A {response_time => count} dict that holds the response time distribution of all129 the requests.130 The keys (the response time in ms) are rounded to store 1, 2, ... 9, 10, 20. .. 90,131 100, 200 .. 900, 1000, 2000 ... 9000, in order to save memory.132 This dict is used to calculate the median and percentile response times.133 """134 total_content_length = None135 """ The sum of the content length of all the requests for this entry """136 start_time = None137 """ Time of the first request for this entry """138 last_request_timestamp = None139 """ Time of the last request for this entry """140 def __init__(self, stats, name, method):141 self.stats = stats142 self.name = name143 self.method = method144 self.reset()145 def reset(self):146 self.start_time = time.time()147 self.num_requests = 0148 self.num_failures = 0149 self.total_response_time = 0150 self.response_times = {}151 self.min_response_time = None152 self.max_response_time = 0153 self.last_request_timestamp = int(time.time())154 self.num_reqs_per_sec = {}155 self.total_content_length = 0156 def log(self, response_time, content_length):157 self.stats.num_requests += 1158 self.num_requests += 1159 self._log_time_of_request()160 self._log_response_time(response_time)161 # increase total content-length162 self.total_content_length += content_length163 def _log_time_of_request(self):164 t = int(time.time())165 self.num_reqs_per_sec[t] = self.num_reqs_per_sec.setdefault(t, 0) + 1166 self.last_request_timestamp = t167 self.stats.last_request_timestamp = t168 def _log_response_time(self, response_time):169 self.total_response_time += response_time170 if self.min_response_time is None:171 self.min_response_time = response_time172 self.min_response_time = min(self.min_response_time, response_time)173 self.max_response_time = max(self.max_response_time, response_time)174 # to avoid to much data that has to be transfered to the master node when175 # running in distributed mode, we save the response time rounded in a dict176 # so that 147 becomes 150, 3432 becomes 3400 and 58760 becomes 59000177 if response_time < 100:178 rounded_response_time = response_time179 elif response_time < 1000:180 rounded_response_time = int(round(response_time, -1))181 elif response_time < 10000:182 rounded_response_time = int(round(response_time, -2))183 else:184 rounded_response_time = int(round(response_time, -3))185 # increase request count for the rounded key in response time dict186 self.response_times.setdefault(rounded_response_time, 0)187 self.response_times[rounded_response_time] += 1188 def log_error(self, error):189 self.num_failures += 1190 self.stats.num_failures += 1191 key = StatsError.create_key(self.method, self.name, error)192 entry = self.stats.errors.get(key)193 if not entry:194 entry = StatsError(self.method, self.name, error)195 self.stats.errors[key] = entry196 entry.occured()197 @property198 def fail_ratio(self):199 try:200 return float(self.num_failures) / (self.num_requests + self.num_failures)201 except ZeroDivisionError:202 if self.num_failures > 0:203 return 1.0204 else:205 return 0.0206 @property207 def avg_response_time(self):208 try:209 return float(self.total_response_time) / self.num_requests210 except ZeroDivisionError:211 return 0212 @property213 def median_response_time(self):214 if not self.response_times:215 return 0216 return median_from_dict(self.num_requests, self.response_times)217 @property218 def current_rps(self):219 if self.stats.last_request_timestamp is None:220 return 0221 slice_start_time = max(self.stats.last_request_timestamp - 12, int(self.stats.start_time or 0))222 reqs = [self.num_reqs_per_sec.get(t, 0) for t in range(slice_start_time, self.stats.last_request_timestamp-2)]223 return avg(reqs)224 @property225 def total_rps(self):226 if not self.stats.last_request_timestamp or not self.stats.start_time:227 return 0.0228 return self.num_requests / max(self.stats.last_request_timestamp - self.stats.start_time, 1)229 @property230 def avg_content_length(self):231 try:232 return self.total_content_length / self.num_requests233 except ZeroDivisionError:234 return 0235 def extend(self, other, full_request_history=False):236 """237 Extend the data fro the current StatsEntry with the stats from another238 StatsEntry instance.239 If full_request_history is False, we'll only care to add the data from240 the last 20 seconds of other's stats. The reason for this argument is that241 extend can be used to generate an aggregate of multiple different StatsEntry242 instances on the fly, in order to get the *total* current RPS, average243 response time, etc.244 """245 self.last_request_timestamp = max(self.last_request_timestamp, other.last_request_timestamp)246 self.start_time = min(self.start_time, other.start_time)247 self.num_requests = self.num_requests + other.num_requests248 self.num_failures = self.num_failures + other.num_failures249 self.total_response_time = self.total_response_time + other.total_response_time250 self.max_response_time = max(self.max_response_time, other.max_response_time)251 self.min_response_time = min(self.min_response_time, other.min_response_time) or other.min_response_time252 self.total_content_length = self.total_content_length + other.total_content_length253 if full_request_history:254 for key in other.response_times:255 self.response_times[key] = self.response_times.get(key, 0) + other.response_times[key]256 for key in other.num_reqs_per_sec:257 self.num_reqs_per_sec[key] = self.num_reqs_per_sec.get(key, 0) + other.num_reqs_per_sec[key]258 else:259 # still add the number of reqs per seconds the last 20 seconds260 for i in xrange(other.last_request_timestamp-20, other.last_request_timestamp+1):261 if i in other.num_reqs_per_sec:262 self.num_reqs_per_sec[i] = self.num_reqs_per_sec.get(i, 0) + other.num_reqs_per_sec[i]263 def serialize(self):264 return {265 "name": self.name,266 "method": self.method,267 "last_request_timestamp": self.last_request_timestamp,268 "start_time": self.start_time,269 "num_requests": self.num_requests,270 "num_failures": self.num_failures,271 "total_response_time": self.total_response_time,272 "max_response_time": self.max_response_time,273 "min_response_time": self.min_response_time,274 "total_content_length": self.total_content_length,275 "response_times": self.response_times,276 "num_reqs_per_sec": self.num_reqs_per_sec,277 }278 @classmethod279 def unserialize(cls, data):280 obj = cls(None, data["name"], data["method"])281 for key in [282 "last_request_timestamp",283 "start_time",284 "num_requests",285 "num_failures",286 "total_response_time",287 "max_response_time",288 "min_response_time",289 "total_content_length",290 "response_times",291 "num_reqs_per_sec",292 ]:293 setattr(obj, key, data[key])294 return obj295 def get_stripped_report(self):296 """297 Return the serialized version of this StatsEntry, and then clear the current stats.298 """299 report = self.serialize()300 self.reset()301 return report302 def __str__(self):303 try:304 fail_percent = (self.num_failures/float(self.num_requests + self.num_failures))*100305 except ZeroDivisionError:306 fail_percent = 0307 return (" %-" + str(STATS_NAME_WIDTH) + "s %7d %12s %7d %7d %7d | %7d %7.2f") % (308 self.method + " " + self.name,309 self.num_requests,310 "%d(%.2f%%)" % (self.num_failures, fail_percent),311 self.avg_response_time,312 self.min_response_time or 0,313 self.max_response_time,314 self.median_response_time or 0,315 self.current_rps or 0316 )317 def get_response_time_percentile(self, percent):318 """319 Get the response time that a certain number of percent of the requests320 finished within.321 Percent specified in range: 0.0 - 1.0322 """323 num_of_request = int((self.num_requests * percent))324 processed_count = 0325 for response_time in sorted(self.response_times.iterkeys(), reverse=True):326 processed_count += self.response_times[response_time]327 if((self.num_requests - processed_count) <= num_of_request):328 return response_time329 def percentile(self, tpl=" %-" + str(STATS_NAME_WIDTH) + "s %8d %6d %6d %6d %6d %6d %6d %6d %6d %6d"):330 if not self.num_requests:331 raise ValueError("Can't calculate percentile on url with no successful requests")332 return tpl % (333 str(self.method) + " " + self.name,334 self.num_requests,335 self.get_response_time_percentile(0.5),336 self.get_response_time_percentile(0.66),337 self.get_response_time_percentile(0.75),338 self.get_response_time_percentile(0.80),339 self.get_response_time_percentile(0.90),340 self.get_response_time_percentile(0.95),341 self.get_response_time_percentile(0.98),342 self.get_response_time_percentile(0.99),343 self.max_response_time344 )345class StatsError(object):346 def __init__(self, method, name, error, occurences=0):347 self.method = method348 self.name = name349 self.error = error350 self.occurences = occurences351 @classmethod352 def create_key(cls, method, name, error):353 key = "%s.%s.%r" % (method, name, error)354 return hashlib.md5(key).hexdigest()355 def occured(self):356 self.occurences += 1357 def to_name(self):358 return "%s %s: %r" % (self.method,359 self.name, repr(self.error))360 def to_dict(self):361 return {362 "method": self.method,363 "name": self.name,364 "error": repr(self.error),365 "occurences": self.occurences366 }367 @classmethod368 def from_dict(cls, data):369 return cls(370 data["method"],371 data["name"],372 data["error"],373 data["occurences"]374 )375def avg(values):376 return sum(values, 0.0) / max(len(values), 1)377def median_from_dict(total, count):378 """379 total is the number of requests made380 count is a dict {response_time: count}381 """382 pos = (total - 1) / 2383 for k in sorted(count.iterkeys()):384 if pos < count[k]:385 return k386 pos -= count[k]387global_stats = RequestStats()388"""389A global instance for holding the statistics. Should be removed eventually.390"""391def on_request_success(request_type, name, response_time, response_length):392 if global_stats.max_requests is not None and (global_stats.num_requests + global_stats.num_failures) >= global_stats.max_requests:393 raise Exception("Maximum number of requests reached")394 global_stats.get(name, request_type).log(response_time, response_length)395def on_request_failure(request_type, name, response_time, exception):396 if global_stats.max_requests is not None and (global_stats.num_requests + global_stats.num_failures) >= global_stats.max_requests:397 raise Exception("Maximum number of requests reached")398 global_stats.get(name, request_type).log_error(exception)399def on_report_to_master(client_id, data):400 data["stats"] = [global_stats.entries[key].get_stripped_report() for key in global_stats.entries.iterkeys() if not (global_stats.entries[key].num_requests == 0 and global_stats.entries[key].num_failures == 0)]401 data["errors"] = dict([(k, e.to_dict()) for k, e in global_stats.errors.iteritems()])402 # clear out the errors.403 global_stats.errors = {}404def on_manager_report(client_id, data):405 timing_entry = global_stats.get_timings(client_id)406 record_stamp = int(time.time())407 for stats_data in data["stats"]:408 entry = StatsEntry.unserialize(stats_data)409 # for each entry get the max_response_time,410 # store that in a time stamped data object for that client.411 timing_entry.log(record_stamp, entry.max_response_time)412 request_key = (entry.name, entry.method)413 if not request_key in global_stats.entries:414 global_stats.entries[request_key] = StatsEntry(global_stats, entry.name, entry.method)415 global_stats.entries[request_key].extend(entry, full_request_history=True)416 global_stats.last_request_timestamp = max(global_stats.last_request_timestamp, entry.last_request_timestamp)417 for error_key, error in data["errors"].iteritems():418 if error_key not in global_stats.errors:419 global_stats.errors[error_key] = StatsError.from_dict(error)420 else:421 global_stats.errors[error_key].occurences += error["occurences"]422def on_request_slow(name, response_time, response_length):423 print "****** {2} slow request({0}): {1}".format(response_time, name, str(datetime.datetime.now()))424events.request_success += on_request_success425events.request_failure += on_request_failure426events.request_slow += on_request_slow427events.report_to_master += on_report_to_master...
main.py
Source:main.py
1import time2import threading3class RequestTokenBucket(object):4 """5 A class that models a token bucket to be used for rate limiting purposes.6 Attributes7 ----------8 max_tokens : int9 The maximum number of tokens the RequestTokenBucket object is able to hold at any given time.10 Defaults to 10.11 refill_rate : int12 The number of seconds it takes for a single token to be refilled into the RequestTokenBucket object.13 Defaults to 5.14 last_request_timestamp : int15 A timestamp of the last time a request was allowed by the bucket, represented as seconds since epoch.16 current_count : int17 The current number of tokens in the bucket.18 lock : Lock19 A Lock object that allows the bucket to be locked when multiple threads might be trying to make requests.20 Methods21 -------22 calculate_current_tokens():23 Determine the number of tokens a RequestTokenBucket object should have and set the object's current_count attribute to that value.24 Return without setting the current_count value if the RequestTokenBucket object does not have a last_request_timestamp value, as25 we assume this to mean no requests have been made to the bucket yet, and buckets are full of tokens when instantiated.26 27 print_bucket_summary():28 Print a formatted summary of the max token capacity, refill rate, current token count, and last request time of a RequestTokenBucket object.29 """30 def __init__(self, max_tokens=10, refill_rate=5):31 """32 Constructor for RequestTokenBucket objects.33 Initialize the last_request_timestamp attribute as None because we could not have made a request to a bucket that did not exist before.34 Initialize the current_count attribute to the value of max_tokens, as we want the RequestTokenBucket object to be full when instantiated.35 Parameters36 ----------37 max_tokens : int38 The maximum number of tokens the RequestTokenBucket object is able to hold at any given time.39 Defaults to 10.40 refill_rate : int41 The number of seconds it takes for a single token to be refilled into the RequestTokenBucket object.42 Defaults to 5.43 """44 self.max_tokens = max_tokens45 self.refill_rate = refill_rate46 self.last_request_timestamp = None47 self.current_count = max_tokens48 self.lock = threading.Lock()49 def calculate_current_tokens(self):50 """51 Determine the number of tokens a RequestTokenBucket object should have and set the object's current_count attribute to that value.52 Return without setting the current_count value if the RequestTokenBucket object does not have a last_request_timestamp value, as53 we assume this to mean no requests have been made to the bucket yet, and buckets are full of tokens when instantiated.54 Parameters55 ----------56 None57 Returns58 -------59 None60 """61 if self.last_request_timestamp is None:62 return63 tokens_since_last_request = self.__time_since_last_request() // self.refill_rate64 self.current_count = min(self.max_tokens, self.current_count + tokens_since_last_request)65 66 def print_bucket_summary(self):67 """68 Print a formatted summary of the max token capacity, refill rate, current token count, and last request time of a RequestTokenBucket object.69 Parameters70 ----------71 None72 Returns73 -------74 None75 """76 print("Max Token Capacity: {}".format(self.max_tokens))77 print("Refill Rate: {}".format(self.refill_rate))78 print("Current Token Count: {}".format(self.current_count))79 print("Last Request Time: {}".format(self.last_request_timestamp))80 @classmethod81 def __get_current_time_in_seconds(cls):82 """83 Return the current time represented in seconds since epoch.84 Parameters85 ----------86 None87 Returns88 -------89 An integer representing the current time as seconds since epoch.90 """91 return int(round(time.time()))92 93 def __time_since_last_request(self):94 """95 Return an integer representing the number of seconds since the last request made to a RequestTokenBucket object.96 Parameters97 ----------98 None99 Returns100 -------101 An integer representing the number of seconds since the last request made to a RequestTokenBucket object.102 """103 return self.__get_current_time_in_seconds() - self.last_request_timestamp104 105class TokenBucketRateLimiter(object):106 """107 A class that interacts with the RequestTokenBucket class to simulate rate limiting behavior. 108 Attributes109 ----------110 rate_limiter_dict : Dictionary111 A dictionary that maps an account ID (key) to a RequestTokenBucket object (value).112 Methods113 -------114 add_account(account_id, request_token_bucket):115 Add the account_id and request_token_bucket as a key-value pair to the TokenBucketRateLimiter object's rate_limiter_dict.116 117 allow_request_to_service(account_id):118 Determine if a request should be allowed based on the current token count of a bucket.119 Print information about the bucket being requested and if the request will be allowed.120 If allowed, perform the request in a thread-safe manner.121 """122 def __init__(self):123 """124 Constructor for TokenBucketRateLimiter objects.125 Parameters126 ----------127 None128 """129 self.rate_limiter_dict = {}130 def add_account(self, account_id, request_token_bucket):131 """132 Add the account_id and request_token_bucket as a key-value pair to the TokenBucketRateLimiter object's rate_limiter_dict.133 Parameters134 ----------135 account_id : int136 The ID of the account we are working with.137 request_token_bucket : RequestTokenBucket138 The token bucket we want to use to determine if requests from the account associated with account_id should be allowed or not.139 Returns140 -------141 None142 """143 self.rate_limiter_dict[account_id] = request_token_bucket144 def allow_request_to_service(self, account_id):145 """146 Calculate the current number of tokens in the bucket associated with account_id, allow the request if there are enough tokens to do so, reject the request if not.147 Print information regarding which thread is making a request, which bucket they are requesting, and the current token count of the bucket being requested.148 Parameters149 ----------150 account_id : int151 The ID of the account we are working with.152 Returns153 -------154 A boolean denoting if the request was allowed or not.155 """156 token_bucket = self.rate_limiter_dict[account_id]157 # Lock the bucket so that we do not have concurrency issues that result in us bypassing the rate limit.158 with token_bucket.lock:159 token_bucket.calculate_current_tokens()160 print("**** {} is making a request to bucket {}****".format(threading.current_thread().name, account_id))161 print("Current Tokens for Bucket {}: {}".format(account_id, token_bucket.current_count))162 # Allow the request if the token bucket has at least 1 token.163 # If the request is allowed, we will update the bucket's last_request_timestamp and remove a token from the bucket.164 if token_bucket.current_count > 0:165 print("Processing request\n")166 token_bucket.last_request_timestamp = int(round(time.time()))167 token_bucket.current_count -= 1168 return True169 else:170 print("Not enough tokens to process request. Please try again in {} seconds.\n".format(token_bucket.refill_rate))171 return False172 173def simulate_requests(rate_limiter):174 """175 Takes in a TokenBucketRateLimiter object and simulates making requests to buckets within that object for 60 seconds.176 Parameters177 ----------178 rate_limiter : TokenBucketRateLimiter179 A TokenBucketRateLimiter containing buckets we want to simulate the rate limiting capabilities of.180 Returns181 -------182 None183 """184 end_time = time.time() + 60185 while time.time() < end_time:186 rate_limiter.allow_request_to_service(1)187 rate_limiter.allow_request_to_service(2)188 time.sleep(3)189rate_limiter = TokenBucketRateLimiter()190max_tokens_input = int(input("Please specify the maximum number of tokens the first bucket should have: "))191refill_rate_input = int(input("Please specify the rate in seconds at which a single token should be refilled into the first bucket: "))192rate_limiter.add_account(1, RequestTokenBucket(max_tokens_input, refill_rate_input))193print("**** Summary of Specs for Token Bucket One ****")194rate_limiter.rate_limiter_dict[1].print_bucket_summary()195print("\n")196rate_limiter.add_account(2, RequestTokenBucket(5, 10))197print("**** Summary of Specs for Token Bucket Two ****")198rate_limiter.rate_limiter_dict[2].print_bucket_summary()199print("\n")200if __name__=="__main__":201 thread1 = threading.Thread(target=simulate_requests, args=(rate_limiter,))202 thread2 = threading.Thread(target=simulate_requests, args=(rate_limiter,))203 thread1.start()204 thread2.start()205 thread1.join()206 thread2.join()...
chs_wrapper_class.py
Source:chs_wrapper_class.py
1import requests2import json3import datetime4import time56class CompaniesHouseService:7 """A wrapper around the companies house API.8 9 Attributes:10 search_url (str): Base url for Companies House search query.11 company_url (str): Base url for Companies House company query.12 13 """14 search_url = "https://api.companieshouse.gov.uk/search/companies?q={}"15 company_url = "https://api.companieshouse.gov.uk/company/{}"16 17 def __init__(self, key, time_between_requests=0.5):18 """19 Args:20 key (str): The API key issued in the Companies House API 21 applications.22 time_between_requests (float): Time in seconds between requests to 23 the API to prevent spam. Default is 0.5 to prevent calls 24 exceeding the 600 per 5 minutes limit.25 26 """27 self.key = key28 self.time_between_requests = time_between_requests29 30 #: datetime: Timestamp instantiated as NoneType 31 self.last_request_timestamp = None32 33 def _query_ch_api(self, url, query):34 """Sends a request to the Companies House API.35 36 Args:37 url (str): The specific url to be queried depending on the type38 of request (search, profile etc.).39 query (str): The query parameter to be sent alongside the url.40 41 Returns:42 dict: A structured dictionary containing all of the information43 returned by the API.44 45 """46 query = self._remove_problem_characters(query)47 48 self._rate_limiting()4950 resultQuery = requests.get(url.format(query),auth=(self.key,''))51 #200 is the authorised code for RESTful API calls52 if resultQuery.status_code == 200:53 result = json.JSONDecoder().decode(resultQuery.text)54 else:55 print(f"Failed with error code: {resultQuery.status_code} | "\56 f"Reason: {resultQuery.reason}")57 result = {}58 59 return result60 61 def _rate_limiting(self):62 """Waits up to the defined time between requests.63 64 If more than the defined "time_between_requests" has passed (in 65 seconds) since the last call, this function will not wait any time.66 The last_request_timestamp class variable is reset to the current67 time every time this method is called.68 69 """70 if self.last_request_timestamp is None:71 self.last_request_timestamp = datetime.datetime.now()72 73 else:74 current_time = datetime.datetime.now()75 76 time_since_request = (current_time - 77 self.last_request_timestamp78 ).total_seconds()79 80 wait_time = max(self.time_between_requests - 81 time_since_request,82 0)83 84 time.sleep(wait_time)85 self.last_request_timestamp = datetime.datetime.now() 86 87 def _remove_problem_characters(self, string):88 """Remove invalid query parameters from the url query89 90 Spaces and the "&" sign will cause issues in an HTTP request so are91 replaced.92 93 Args:94 string (str): The query to be "cleaned".95 96 Returns:97 str: An equivalent string in HTTP GET format98 99 """100 string = string.replace(" ","+")101 string = string.replace("&","%26")102 103 return string104 105 def get_first_company_search(self, company_name):106 """Search for a company and return the top result.107 108 If no results are returned from the Companies House API then returns109 NoneType using a try block.110 111 Args:112 companyName (str): The company to search for.113 114 Returns:115 dict: The profile of the first result found from the API search.116 117 """118 search_result = self._query_ch_api(self.search_url, company_name)119 120 try: 121 first_result = search_result["items"][0]122 except IndexError:123 first_result = None124 125 return first_result126 127 def get_company_profile(self, company_number):128 """Return a company profile from the company number.129 130 Args:131 company_number (str): The unique company number as defined on132 Companies House.133 134 Returns:135 dict: The profile of the corresponding company136 137 """138 company_profile = self._query_ch_api(self.company_url, company_number)139 140 return company_profile141 142if __name__ == "__main__":143 key = "d8d5f785-9b19-4873-b1a0-22659560f7b9" 144 ch_api = CompaniesHouseService(key)145 iterations = 10146 147 tic = datetime.datetime.now()148 149 for company in range(iterations): 150 ch_profile = ch_api.get_company_profile("07958752")151 152 toc = datetime.datetime.now()153 154 time_taken = (toc - tic).total_seconds()155 print(f"Average time per iteration: "\
...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!