Best Python code snippet using locust
client.py
Source:client.py
1# -*- coding: utf-8 -*-2import logging3import hmac4import hashlib5import uuid6import json7import re8import time9import random10from datetime import datetime11import gzip12from io import BytesIO13import warnings14from .compat import (15 compat_urllib_parse, compat_urllib_error,16 compat_urllib_request, compat_urllib_parse_urlparse)17from .errors import (18 ClientErrorCodes, ClientError, ClientLoginError, ClientLoginRequiredError,19 ClientCookieExpiredError, ClientThrottledError)20from .constants import Constants21from .http import ClientCookieJar22from .endpoints import (23 AccountsEndpointsMixin, DiscoverEndpointsMixin, FeedEndpointsMixin,24 FriendshipsEndpointsMixin, LiveEndpointsMixin, MediaEndpointsMixin,25 MiscEndpointsMixin, LocationsEndpointsMixin, TagsEndpointsMixin,26 UsersEndpointsMixin, UploadEndpointsMixin, UsertagsEndpointsMixin,27 CollectionsEndpointsMixin,28 ClientDeprecationWarning, ClientPendingDeprecationWarning,29 ClientExperimentalWarning30)31logger = logging.getLogger(__name__)32# Force Client deprecation warnings to always appear33warnings.simplefilter('always', ClientDeprecationWarning)34warnings.simplefilter('always', ClientPendingDeprecationWarning)35warnings.simplefilter('default', ClientExperimentalWarning)36class Client(AccountsEndpointsMixin, DiscoverEndpointsMixin, FeedEndpointsMixin,37 FriendshipsEndpointsMixin, LiveEndpointsMixin, MediaEndpointsMixin,38 MiscEndpointsMixin, LocationsEndpointsMixin, TagsEndpointsMixin,39 UsersEndpointsMixin, UploadEndpointsMixin, UsertagsEndpointsMixin,40 CollectionsEndpointsMixin, object):41 """Main API client class for the private app api."""42 API_URL = 'https://i.instagram.com/api/{version!s}/'43 USER_AGENT = Constants.USER_AGENT44 IG_SIG_KEY = Constants.IG_SIG_KEY45 IG_CAPABILITIES = Constants.IG_CAPABILITIES46 SIG_KEY_VERSION = Constants.SIG_KEY_VERSION47 def __init__(self, username, password, **kwargs):48 """49 :param username: Login username50 :param password: Login password51 :param kwargs: See below52 :Keyword Arguments:53 - **auto_patch**: Patch the api objects to match the public API. Default: False54 - **drop_incompat_key**: Remove api object keys that is not in the public API. Default: False55 - **timeout**: Timeout interval in seconds. Default: 1556 - **api_url**: Override the default api url base57 - **cookie**: Saved cookie string from a previous session58 - **settings**: A dict of settings from a previous session59 - **on_login**: Callback after successful login60 - **proxy**: Specify a proxy ex: 'http://127.0.0.1:8888' (ALPHA)61 :return:62 """63 self.username = username64 self.password = password65 self.auto_patch = kwargs.pop('auto_patch', False)66 self.drop_incompat_keys = kwargs.pop('drop_incompat_keys', False)67 self.api_url = kwargs.pop('api_url', None) or self.API_URL68 self.timeout = kwargs.pop('timeout', 15)69 self.on_login = kwargs.pop('on_login', None)70 self.logger = logger71 user_settings = kwargs.pop('settings', None) or {}72 self.uuid = (73 kwargs.pop('guid', None) or kwargs.pop('uuid', None) or74 user_settings.get('uuid') or self.generate_uuid(False))75 self.device_id = (76 kwargs.pop('device_id', None) or user_settings.get('device_id') or77 self.generate_deviceid())78 self.signature_key = (79 kwargs.pop('signature_key', None) or user_settings.get('signature_key') or80 self.IG_SIG_KEY)81 self.key_version = (82 kwargs.pop('key_version', None) or user_settings.get('key_version') or83 self.SIG_KEY_VERSION)84 self.ig_capabilities = (85 kwargs.pop('ig_capabilities', None) or user_settings.get('ig_capabilities') or86 self.IG_CAPABILITIES)87 # to maintain backward compat for user_agent kwarg88 custom_ua = kwargs.pop('user_agent', '') or user_settings.get('user_agent')89 if custom_ua:90 self.user_agent = custom_ua91 else:92 self.app_version = (93 kwargs.pop('app_version', None) or user_settings.get('app_version') or94 Constants.APP_VERSION)95 self.android_release = (96 kwargs.pop('android_release', None) or user_settings.get('android_release') or97 Constants.ANDROID_RELEASE)98 self.android_version = int(99 kwargs.pop('android_version', None) or user_settings.get('android_version') or100 Constants.ANDROID_VERSION)101 self.phone_manufacturer = (102 kwargs.pop('phone_manufacturer', None) or user_settings.get('phone_manufacturer') or103 Constants.PHONE_MANUFACTURER)104 self.phone_device = (105 kwargs.pop('phone_device', None) or user_settings.get('phone_device') or106 Constants.PHONE_DEVICE)107 self.phone_model = (108 kwargs.pop('phone_model', None) or user_settings.get('phone_model') or109 Constants.PHONE_MODEL)110 self.phone_dpi = (111 kwargs.pop('phone_dpi', None) or user_settings.get('phone_dpi') or112 Constants.PHONE_DPI)113 self.phone_resolution = (114 kwargs.pop('phone_resolution', None) or user_settings.get('phone_resolution') or115 Constants.PHONE_RESOLUTION)116 self.phone_chipset = (117 kwargs.pop('phone_chipset', None) or user_settings.get('phone_chipset') or118 Constants.PHONE_CHIPSET)119 cookie_string = kwargs.pop('cookie', None) or user_settings.get('cookie')120 cookie_jar = ClientCookieJar(cookie_string=cookie_string)121 if cookie_string and cookie_jar.expires_earliest and int(time.time()) >= cookie_jar.expires_earliest:122 raise ClientCookieExpiredError('Oldest cookie expired at {0!s}'.format(cookie_jar.expires_earliest))123 cookie_handler = compat_urllib_request.HTTPCookieProcessor(cookie_jar)124 proxy_handler = None125 proxy = kwargs.pop('proxy', None)126 if proxy:127 warnings.warn('Proxy support is alpha.', UserWarning)128 parsed_url = compat_urllib_parse_urlparse(proxy)129 if parsed_url.netloc and parsed_url.scheme:130 proxy_address = '{0!s}://{1!s}'.format(parsed_url.scheme, parsed_url.netloc)131 proxy_handler = compat_urllib_request.ProxyHandler({'https': proxy_address})132 else:133 raise ValueError('Invalid proxy argument: {0!s}'.format(proxy))134 handlers = []135 if proxy_handler:136 handlers.append(proxy_handler)137 # Allow user to override custom ssl context where possible138 custom_ssl_context = kwargs.pop('custom_ssl_context', None)139 try:140 httpshandler = compat_urllib_request.HTTPSHandler(context=custom_ssl_context)141 except TypeError:142 # py version < 2.7.9143 httpshandler = compat_urllib_request.HTTPSHandler()144 handlers.extend([145 compat_urllib_request.HTTPHandler(),146 httpshandler,147 cookie_handler])148 opener = compat_urllib_request.build_opener(*handlers)149 opener.cookie_jar = cookie_jar150 self.opener = opener151 # ad_id must be initialised after cookie_jar/opener because152 # it relies on self.authenticated_user_name153 self.ad_id = (154 kwargs.pop('ad_id', None) or user_settings.get('ad_id') or155 self.generate_adid())156 if not cookie_string: # [TODO] There's probably a better way than to depend on cookie_string157 if not self.username or not self.password:158 raise ClientLoginRequiredError('login_required', code=400)159 self.login()160 self.logger.debug('USERAGENT: {0!s}'.format(self.user_agent))161 super(Client, self).__init__()162 @property163 def settings(self):164 """Helper property that extracts the settings that you should cache165 in addition to username and password."""166 return {167 'uuid': self.uuid,168 'device_id': self.device_id,169 'ad_id': self.ad_id,170 'cookie': self.cookie_jar.dump(),171 'created_ts': int(time.time())172 }173 @property174 def user_agent(self):175 """Returns the useragent string that the client is currently using."""176 return Constants.USER_AGENT_FORMAT % {177 'app_version': self.app_version,178 'android_version': self.android_version,179 'android_release': self.android_release,180 'brand': self.phone_manufacturer,181 'device': self.phone_device,182 'model': self.phone_model,183 'dpi': self.phone_dpi,184 'resolution': self.phone_resolution,185 'chipset': self.phone_chipset}186 @user_agent.setter187 def user_agent(self, value):188 """Override the useragent string with your own"""189 mobj = re.search(Constants.USER_AGENT_EXPRESSION, value)190 if not mobj:191 raise ValueError('User-agent specified does not fit format required: {0!s}'.format(192 Constants.USER_AGENT_EXPRESSION))193 self.app_version = mobj.group('app_version')194 self.android_release = mobj.group('android_release')195 self.android_version = int(mobj.group('android_version'))196 self.phone_manufacturer = mobj.group('manufacturer')197 self.phone_device = mobj.group('device')198 self.phone_model = mobj.group('model')199 self.phone_dpi = mobj.group('dpi')200 self.phone_resolution = mobj.group('resolution')201 self.phone_chipset = mobj.group('chipset')202 @staticmethod203 def generate_useragent(**kwargs):204 """205 Helper method to generate a useragent string based on device parameters206 :param kwargs:207 - **app_version**208 - **android_version**209 - **android_release**210 - **brand**211 - **device**212 - **model**213 - **dpi**214 - **resolution**215 - **chipset**216 :return: A compatible user agent string217 """218 return Constants.USER_AGENT_FORMAT % {219 'app_version': kwargs.pop('app_version', None) or Constants.APP_VERSION,220 'android_version': int(kwargs.pop('android_version', None) or Constants.ANDROID_VERSION),221 'android_release': kwargs.pop('android_release', None) or Constants.ANDROID_RELEASE,222 'brand': kwargs.pop('phone_manufacturer', None) or Constants.PHONE_MANUFACTURER,223 'device': kwargs.pop('phone_device', None) or Constants.PHONE_DEVICE,224 'model': kwargs.pop('phone_model', None) or Constants.PHONE_MODEL,225 'dpi': kwargs.pop('phone_dpi', None) or Constants.PHONE_DPI,226 'resolution': kwargs.pop('phone_resolution', None) or Constants.PHONE_RESOLUTION,227 'chipset': kwargs.pop('phone_chipset', None) or Constants.PHONE_CHIPSET}228 @staticmethod229 def validate_useragent(value):230 """231 Helper method to validate a useragent string for format correctness232 :param value:233 :return:234 """235 mobj = re.search(Constants.USER_AGENT_EXPRESSION, value)236 if not mobj:237 raise ValueError('User-agent specified does not fit format required: {0!s}'.format(238 Constants.USER_AGENT_EXPRESSION))239 parse_params = {240 'app_version': mobj.group('app_version'),241 'android_version': int(mobj.group('android_version')),242 'android_release': mobj.group('android_release'),243 'brand': mobj.group('manufacturer'),244 'device': mobj.group('device'),245 'model': mobj.group('model'),246 'dpi': mobj.group('dpi'),247 'resolution': mobj.group('resolution'),248 'chipset': mobj.group('chipset')249 }250 return {251 'user_agent': Constants.USER_AGENT_FORMAT % parse_params,252 'parsed_params': parse_params253 }254 def get_cookie_value(self, key):255 for cookie in self.cookie_jar:256 if cookie.name.lower() == key.lower():257 return cookie.value258 return None259 @property260 def csrftoken(self):261 """The client's current csrf token"""262 return self.get_cookie_value('csrftoken')263 @property264 def token(self):265 """For compatibility. Equivalent to :meth:`csrftoken`"""266 return self.csrftoken267 @property268 def authenticated_user_id(self):269 """The current authenticated user id"""270 return self.get_cookie_value('ds_user_id')271 @property272 def authenticated_user_name(self):273 """The current authenticated user name"""274 return self.get_cookie_value('ds_user')275 @property276 def phone_id(self):277 """Current phone ID. For use in certain functions."""278 return self.generate_uuid(return_hex=False, seed=self.device_id)279 @property280 def timezone_offset(self):281 """Timezone offset in seconds. For use in certain functions."""282 return int(round((datetime.now() - datetime.utcnow()).total_seconds()))283 @property284 def rank_token(self):285 if not self.authenticated_user_id:286 return None287 return '{0!s}_{1!s}'.format(self.authenticated_user_id, self.uuid)288 @property289 def authenticated_params(self):290 return {291 '_csrftoken': self.csrftoken,292 '_uuid': self.uuid,293 '_uid': self.authenticated_user_id294 }295 @property296 def cookie_jar(self):297 """The client's cookiejar instance."""298 return self.opener.cookie_jar299 @property300 def default_headers(self):301 return {302 'User-Agent': self.user_agent,303 'Connection': 'close',304 'Accept': '*/*',305 'Accept-Language': 'en-US',306 'Accept-Encoding': 'gzip, deflate',307 'X-IG-Capabilities': self.ig_capabilities,308 'X-IG-Connection-Type': 'WIFI',309 'X-IG-Connection-Speed': '{0:d}kbps'.format(random.randint(1000, 5000)),310 }311 @property312 def radio_type(self):313 """For use in certain endpoints"""314 return 'wifi-none'315 def _generate_signature(self, data):316 """317 Generates the signature for a data string318 :param data: content to be signed319 :return:320 """321 return hmac.new(322 self.signature_key.encode('ascii'), data.encode('ascii'),323 digestmod=hashlib.sha256).hexdigest()324 @classmethod325 def generate_uuid(cls, return_hex=False, seed=None):326 """327 Generate uuid328 :param return_hex: Return in hex format329 :param seed: Seed value to generate a consistent uuid330 :return:331 """332 if seed:333 m = hashlib.md5()334 m.update(seed.encode('utf-8'))335 new_uuid = uuid.UUID(m.hexdigest())336 else:337 new_uuid = uuid.uuid1()338 if return_hex:339 return new_uuid.hex340 return str(new_uuid)341 @classmethod342 def generate_deviceid(cls, seed=None):343 """344 Generate an android device ID345 :param seed: Seed value to generate a consistent device ID346 :return:347 """348 return 'android-{0!s}'.format(cls.generate_uuid(True, seed)[:16])349 def generate_adid(self, seed=None):350 """351 Generate an Advertising ID based on the login username since352 the Google Ad ID is a personally identifying but resettable ID.353 :return:354 """355 modified_seed = seed or self.authenticated_user_name or self.username356 if modified_seed:357 # Do some trivial mangling of original seed358 sha2 = hashlib.sha256()359 sha2.update(modified_seed.encode('utf-8'))360 modified_seed = sha2.hexdigest()361 return self.generate_uuid(False, modified_seed)362 @staticmethod363 def _read_response(response):364 """365 Extract the response body from a http response.366 :param response:367 :return:368 """369 if response.info().get('Content-Encoding') == 'gzip':370 buf = BytesIO(response.read())371 res = gzip.GzipFile(fileobj=buf).read().decode('utf8')372 else:373 res = response.read().decode('utf8')374 return res375 def _call_api(self, endpoint, params=None, query=None, return_response=False, unsigned=False, version='v1'):376 """377 Calls the private api.378 :param endpoint: endpoint path that should end with '/', example 'discover/explore/'379 :param params: POST parameters380 :param query: GET url query parameters381 :param return_response: return the response instead of the parsed json object382 :param unsigned: use post params as-is without signing383 :param version: for the versioned api base url. Default 'v1'.384 :return:385 """386 url = '{0}{1}'.format(self.api_url.format(version=version), endpoint)387 if query:388 url += ('?' if '?' not in endpoint else '&') + compat_urllib_parse.urlencode(query)389 headers = self.default_headers390 data = None391 if params or params == '':392 headers['Content-type'] = 'application/x-www-form-urlencoded; charset=UTF-8'393 if params == '': # force post if empty string394 data = ''.encode('ascii')395 else:396 if not unsigned:397 json_params = json.dumps(params, separators=(',', ':'))398 hash_sig = self._generate_signature(json_params)399 post_params = {400 'ig_sig_key_version': self.key_version,401 'signed_body': hash_sig + '.' + json_params402 }403 else:404 # direct form post405 post_params = params406 data = compat_urllib_parse.urlencode(post_params).encode('ascii')407 req = compat_urllib_request.Request(url, data, headers=headers)408 try:409 self.logger.debug('REQUEST: {0!s} {1!s}'.format(url, req.get_method()))410 self.logger.debug('DATA: {0!s}'.format(data))411 response = self.opener.open(req, timeout=self.timeout)412 except compat_urllib_error.HTTPError as e:413 error_msg = e.reason414 error_response = self._read_response(e)415 self.logger.debug('RESPONSE: {0:d} {1!s}'.format(e.code, error_response))416 try:417 error_obj = json.loads(error_response)418 if error_obj.get('message') == 'login_required':419 raise ClientLoginRequiredError(420 error_obj.get('message'), code=e.code,421 error_response=json.dumps(error_obj))422 elif e.code == ClientErrorCodes.TOO_MANY_REQUESTS:423 raise ClientThrottledError(424 error_obj.get('message'), code=e.code,425 error_response=json.dumps(error_obj))426 elif error_obj.get('message'):427 error_msg = '{0!s}: {1!s}'.format(e.reason, error_obj['message'])428 except (ClientLoginError, ClientLoginRequiredError, ClientThrottledError):429 raise430 except Exception as ex:431 # do nothing else, prob can't parse json432 self.logger.warn('Error parsing error response: {}'.format(str(ex)))433 raise ClientError(error_msg, e.code, error_response)434 if return_response:435 return response436 response_content = self._read_response(response)437 self.logger.debug('RESPONSE: {0:d} {1!s}'.format(response.code, response_content))438 json_response = json.loads(response_content)439 if json_response.get('message', '') == 'login_required':440 raise ClientLoginRequiredError(441 json_response.get('message'), code=response.code,442 error_response=json.dumps(json_response))443 # not from oembed or an ok response444 if not json_response.get('provider_url') and json_response.get('status', '') != 'ok':445 raise ClientError(446 json_response.get('message', 'Unknown error'), code=response.code,447 error_response=json.dumps(json_response))...
analytics.py
Source:analytics.py
1import codecs2import datetime3import json4from util.database import Database5from vendor.instagram_private_api import Client6from api.user.user import User7from util.errors import NetworkError8import platform9import os10import ssl11import certifi12class Analytics(Database):13 def __init__(self):14 super().__init__()15 self.api: Client = None16 self.connectAccount(self.general_settings['instagram_account']['username'],17 self.general_settings['instagram_account']['password'])18 @staticmethod19 def to_json(python_object):20 if isinstance(python_object, bytes):21 return {'__class__': 'bytes',22 '__value__': codecs.encode(python_object, 'base64').decode()}23 raise TypeError(repr(python_object) + ' is not JSON serializable')24 @staticmethod25 def from_json(json_object):26 if '__class__' in json_object and json_object['__class__'] == 'bytes':27 return codecs.decode(json_object['__value__'].encode(), 'base64')28 return json_object29 def onLoginCallback(self, api, username):30 cache_settings = api.settings31 self.execute("DELETE FROM InstagramTokens WHERE `username` = %s", (self.user_id, username))32 self.insert("InstagramAccounts", {33 "username": username,34 "token": json.dumps(cache_settings, default=self.to_json)35 })36 def connectAccount(self, username, password=None):37 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)38 ssl_context.verify_mode = ssl.CERT_REQUIRED39 ssl_context.check_hostname = True40 ssl_context.load_default_certs()41 if platform.system().lower() == 'darwin':42 ssl_context.load_verify_locations(43 cafile=os.path.relpath(certifi.where()),44 capath=None,45 cadata=None)46 saved_account = self.selectOne("SELECT * FROM InstagramTokens WHERE `username` = %s ORDER BY `id` DESC LIMIT 1",47 username)48 if saved_account is not None:49 try:50 cache_settings = json.loads(saved_account['token'], object_hook=self.from_json)51 self.api = Client(auto_patch=True, drop_incompat_keys=False, username=username, password=password,52 settings=cache_settings, custom_ssl_context=ssl_context)53 return True54 except Exception:55 print("Cannot import old account")56 pass57 if password is None:58 return NetworkError(title="Cannot connect to account", message="No password given")59 self.api = Client(auto_patch=True, drop_incompat_keys=False, username=username, password=password,60 on_login=lambda x: self.onLoginCallback(x, username), custom_ssl_context=ssl_context)61 return True62 def getAccountDetails(self, user_id, username):63 account = self.selectOne(64 "SELECT `username`, `followers`, `following`, `posts_count`, `median_likes`, `uid` FROM InstagramAccounts WHERE `user_id` = %s ORDER BY `updated` DESC LIMIT 1",65 user_id)66 if account is None or (username is not None and username != "" and username!=account['username']):67 if username is None or username == "":68 return NetworkError("No username provided", "No username has been provided")69 try:70 return self.getUpdateAccountDetails(user_id, username)71 except:72 return NetworkError("Invalid username", "Cannot retrieve account. The username is either invalid or the account is private.")73 return account74 def getUpdateAccountDetails(self, user_id, username):75 username_info = self.api.username_info(username)['user']76 user_feed = self.api.username_feed(user_name=username, count=64)['items']77 all_likes = [x['likes']['count'] for x in user_feed]78 median_likes = sum(all_likes) / len(all_likes)79 self.insert("InstagramAccounts", {80 "user_id": user_id,81 "username": username,82 "followers": username_info['follower_count'],83 "following": username_info['following_count'],84 "posts_count": username_info['counts']['media'],85 "median_likes": round(median_likes),86 "uid": username_info['id'],87 "feed": json.dumps(user_feed)88 }, check_table_column=False)89 return {90 "username": username,91 "followers": username_info['follower_count'],92 "following": username_info['following_count'],93 "posts_count": username_info['counts']['media'],94 "median_likes": round(median_likes),95 "uid": username_info['id']96 }97if __name__ == "__main__":98 analytics = Analytics()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!