Best Python code snippet using localstack_python
session.py
Source:session.py
...13class AbstractSession(object):14 def __init__(self, session_id, *args, **kw):15 self._session_id = session_id16 super(AbstractSession, self).__init__(*args, **kw)17 def _generate_session_id(self, generate_session_id_func=None, *args):18 r''' generate session id'''19 if not generate_session_id_func:20 return str(uuid.uuid1().hex)21 else:22 return str(generate_session_id_func(*args))23 @property24 def session_id(self):25 r''' return session id'''26 return self._session_id27 def set(self, sname, svalue):28 r''' set session item'''29 raise NotImplementedError30 def get(self, sname):31 r''' get session item'''32 raise NotImplementedError33 def save(self, expire=None):34 r''' save all session items to database or file'''35 raise NotImplementedError36 def renew(self, session_id):37 r''' refresh session id and reread session from database'''38 raise NotImplementedError39 def all(self):40 r''' get all session '''41 return self._data[self._session_id]42 def __getitem__(self, key):43 r''' get session item by name'''44 pass45 def __setitem__(self, key, value):46 r''' set session item by name'''47 pass48 def __delitem__(self, key):49 r''' delete session item by name'''50 pass51class AsyncAbstractSession(dict):52 def __init__(self, session_id, *args, **kw):53 if not session_id:54 session_id = self._generate_session_id()55 self._session_id = session_id56 super(AsyncAbstractSession, self).__init__(*args, **kw)57 def _generate_session_id(self, generate_session_id_func=None, *args):58 if not generate_session_id_func:59 return str(uuid.uuid1().hex)60 else:61 return str(generate_session_id_func(*args))62 @asyncio.coroutine63 def save(self, expire=None):64 r''' save session to database'''65 raise NotImplementedError66 @asyncio.coroutine67 def session(self):68 r''' get database connection and read session from database'''69 raise NotImplementedError70 def get(self, sname):71 raise NotImplementedError72 def set(self, sname, value):73 raise NotImplementedError74 def all(self):75 raise NotImplementedError76 @property77 def session_id(self):78 return self._session_id79class AsyncRedisSession(AsyncAbstractSession):80 def __init__(self, session_id=None, config=None, loop=None):81 super(AsyncRedisSession, self).__init__(session_id)82 if not config:83 self._host = 'localhost'84 self._port = 637985 self._db = 086 self._expire = 087 else:88 assert isinstance(89 config, dict), "redis connection config must dict type"90 self._host = config.get("host")91 self._port = config.get("port")92 self._db = config.get("db", 0)93 self._expire = config.get('expire', 0)94 self._connection = None95 self._active = False96 self._loop = loop97 @asyncio.coroutine98 def session(self, loop=None):99 if loop:100 self._loop = loop101 if not self._connection:102 self._connection = yield from aioredis.create_redis((self._host, self._port), loop=self._loop)103 self._active = True104 self[self._session_id] = yield from self._connection.hgetall(self._session_id)105 if not self[self._session_id]:106 self[self._session_id] = {}107 else:108 expire_delta_time = self[self._session_id].get(109 b"__expire_delta_time", b'').decode("utf-8")110 if expire_delta_time:111 yield from self._connection.expire(self._session_id, int(expire_delta_time))112 return self113 def set(self, sname, svalue):114 self[self._session_id][sname] = svalue115 return self116 def get(self, sname):117 return self[self._session_id].get(sname.encode("utf-8"), b'').decode("utf-8")118 @asyncio.coroutine119 def save(self, expire=None):120 if expire:121 self[self._session_id]['__expire_delta_time'] = int(expire)122 pipe = self._connection.pipeline()123 for key, value in self[self._session_id].items():124 pipe.hset(self._session_id, key, value)125 yield from pipe.execute()126 yield from self._connection.expire(self._session_id, int(expire))127 else:128 if int(self._expire) != 0:129 self[self._session_id]['__expire_delta_time'] = self._expire130 pipe = self._connection.pipeline()131 for key, value in self[self._session_id].items():132 pipe.hset(self._session_id, key, value)133 yield from pipe.execute()134 yield from self._connection.expire(self._session_id, self._expire)135 else:136 pipe = self._connection.pipeline()137 for key, value in self[self._session_id].items():138 pipe.hset(self._session_id, key, value)139 yield from pipe.execute()140 @asyncio.coroutine141 def end(self):142 if self._connection:143 self._connection.close()144 yield from self._connection.wait_closed()145 def all(self):146 return self[self._session_id]147class FileSession(AbstractSession):148 r''' 149 session store in file150 file session driver151 session expire file is place to store session expire time,the file's content is actually152 a json array,array's formation is following:153 [{'session_info':['7437843aefb48938943394',12232831]},{'session_info':['4387812787abcdf43e32d',12323233232]}]154 first item is session id and second is expire timestamp in each dict155 '''156 _data = {}157 def __new__(cls, *args, **kw):158 if not hasattr(cls, '_instance'):159 cls._instance = super(AbstractSession, cls).__new__(cls)160 return cls._instance161 def __init__(self, session_id=None, config=None):162 if not config:163 self._session_dir = '/tmp/session'164 self._session_expire = 0165 else:166 if not isinstance(config, dict):167 raise TypeError("FileSession config must be dict type")168 self._session_dir = config.get('session_dir', '/tmp/session')169 self._session_expire = int(config.get('expire', 0))170 if not os.path.exists(self._session_dir):171 os.mkdir(self._session_dir)172 if session_id == None:173 self._session_id = self._generate_session_id()174 else:175 self._session_id = session_id176 self._session_file = os.path.join(self._session_dir, self._session_id)177 if os.path.exists(self._session_file):178 with open(self._session_file, 'r', errors='ignore', encoding='utf-8') as f:179 self._data[self._session_id] = json.load(f)180 expire_time = self._data[self._session_id].get(181 '__expire_delta_time', None)182 last_active_time = self._data[183 self._session_id].get('__last_active_time')184 if expire_time:185 if (int(time.time()) - (last_active_time)) > expire_time:186 os.remove(self._session_file)187 self._data[self._session_id] = {}188 else:189 self._data[self._session_id] = {}190 super(FileSession, self).__init__(self._session_id)191 def set(self, sname, svalue):192 self._data[self._session_id][sname] = svalue193 return self194 def get(self, sname):195 return self._data[self._session_id].get(sname, None)196 def save(self, expire=None):197 current_time = int(time.time())198 self._data[self._session_id]['__last_active_time'] = current_time199 if expire:200 self._data[self._session_id]['__expire_delta_time'] = int(expire)201 else:202 if int(self._session_expire) != 0:203 self._data[self._session_id][204 '__expire_delta_time'] = self._session_expire205 with open(self._session_file, 'w', errors='ignore', encoding='utf-8') as f:206 json.dump(self._data[self._session_id], f)207 def renew(self, session_id=None):208 if session_id == None:209 self._session_id = self._generate_session_id()210 else:211 self._session_id = session_id212 self._session_file = os.path.join(self._session_dir, self._session_id)213 if os.path.exists(self._session_file):214 with open(self._session_file, 'r', errors='ignore', encoding='utf-8') as f:215 self._data[self._session_id] = json.load(f)216 expire_time = self._data[self._session_id].get(217 '__expire_delta_time', None)218 last_active_time = self._data[219 self._session_id].get('__last_expire_time')220 if expire_time:221 if (int(time.time) - last_active_time) > expire_time:222 os.remove(self._session_file)223 self._data[self._session_id] = {}224 else:225 self._data[self._session_id] = {}226 return self227 def delete(self, session_id=None):228 if session_id:229 if session_id in self._data:230 del self._data[session_id]231 if session_id in os.listdir(self._session_dir):232 os.remove(os.path.join(self._session_dir, session_id))233 return session_id234 else:235 ssid = self._session_id236 del self._data[self._session_id]237 if self._session_id in os.listdir(self._session_dir):238 os.remove(os.path.join(self._session_dir, self._session_id))239 self._session_id = self._generate_session_id()240 self._data[self._session_id] = {}241 return ssid242 def __getitem__(self, key):243 return self._data[self._session_id].get(key)244 def __setitem__(self, key, value):245 self._data[self._session_id][key] = value246class MongoSession(AbstractSession):247 r'''248 mongodb driver for session249 '''250 _data = {}251 def __init__(self, session_id=None, config=None):252 if not config:253 self._host = 'localhost'254 self._port = 27017255 self._db = 'session_database'256 self._collection = 'session'257 self._expire = 0258 else:259 if not isinstance(config, dict):260 raise TypeError("mongo session config must be dict type")261 self._host = config.get('host', 'localhost')262 self._port = config.get('port', 27017)263 self._db = config.get('db', 'session_database')264 self._collection = config.get('collection', 'session')265 self._expire = config.get('expire', 0)266 client = MongoClient(self._host, self._port)267 self._mongo = client[self._db][self._collection]268 if session_id == None:269 self._session_id = self._generate_session_id()270 else:271 self._session_id = session_id272 self._data[self._session_id] = dict(273 self._mongo.find_one({'session_id': self._session_id}))274 expire = self._data[self._session_id].get('expire', None)275 if not self._data.get(self._session_id):276 self._data[self._session_id] = {}277 else:278 if expire:279 if int(expire) < int(time.time()):280 self._data[self._session_id] = {}281 super(MongoSession, self).__init__(self._session_id)282 def get(self, sname):283 return self._data[self._session_id].get(sname)284 def set(self, sname, svalue):285 self._data[self._session_id][sname] = svalue286 return self287 def renew(self, session_id):288 if not session_id:289 self._session_id = self._generate_session_id()290 else:291 self._session_id = session_id292 self._data[self._session_id] = self._mongo.find_one(293 {'session_id': self._session_id})294 expire = self._data[self._session_id].get('expire', None)295 if not self._data.get(self._session_id):296 self._data[self._session_id] = {}297 else:298 if expire:299 if int(expire) < int(time.time()):300 self._data[self._session_id] = {}301 return self302 def save(self, expire=None):303 if expire:304 self._data[self._session_id]['expire'] = int(305 expire) + int(time.time())306 else:307 if int(self._expire) != 0:308 self._data[self._session_id]['expire'] = int(309 time.time()) + int(expire)310 self._mongo.update_one({'session_id': self._session_id}, {311 "$set": self._data[self._session_id]}, upsert=True)312 def delete(self, session_id=None):313 if session_id:314 if session_id in self._data:315 del self._data[session_id]316 session = self._mongo.find_one_and_delete(317 {'session_id': session_id}, projection={'session_id': True})318 return session.get('session_id')319 else:320 del self._data[self._session_id]321 session = self._mongo.find_one_and_delete(322 {'session_id': self._session_id}, projection={'session_id': True})323 self._session_id = self._generate_session_id()324 self._data[self._session_id] = {}325 return session.get('session_id')326 def __getitem__(self, key):327 return self._data[self._session_id].get(key)328 def __setitem__(self, key, value):329 self._data[self._session_id][key] = value330class RedisSession(AbstractSession):331 r'''332 redis driver for session333 '''334 _pool = None335 _data = {}336 def __init__(self, session_id=None, config=None):337 if config == None:338 self._host = 'localhost'339 self._port = 6379340 self._db = 0341 self._expire = 0342 else:343 if not isinstance(config, dict):344 raise TypeError("redis config must be a dict type")345 self._host = config.get('host', 'localhost')346 self._port = config.get('port', 6379)347 self._db = config.get('db', 0)348 self._expire = config.get('expire', 0)349 if not type(self)._pool:350 type(self)._pool = redis.ConnectionPool(351 host=self._host, port=self._port, db=self._db)352 if session_id == None:353 self._session_id = self._generate_session_id()354 else:355 self._session_id = session_id356 self._redis = redis.Redis(connection_pool=type(self)._pool)357 self._data[self._session_id] = self._redis.hgetall(self._session_id)358 if not self._data.get(self._session_id):359 self._data[self._session_id] = {}360 else:361 expire_delta_time = self._data[self._session_id].get(362 '__expire_delta_time'.encode('utf-8'), b'').decode('utf-8')363 if expire_delta_time:364 self._redis.expire(self._session_id, int(expire_delta_time))365 super(RedisSession, self).__init__(self._session_id)366 def get(self, sname):367 return self._data[self._session_id].get(sname.encode('utf-8'), b'').decode('utf-8')368 def set(self, sname, svalue):369 self._data[self._session_id][sname] = svalue370 return self371 def renew(self, session_id=None):372 if session_id == None:373 self._session_id = self._generate_session_id()374 else:375 self._session_id = session_id376 self._data[self._session_id] = self._redis.hgetall(self._session_id)377 if not self._data.get(self._session_id):378 self._data[self._session_id] = {}379 else:380 expire_delta_time = self._data[self._session_id].get(381 '__expire_delta_time'.encode('utf-8'), b'').decode('utf-8')382 if expire_delta_time:383 self._redis.expire(self._session_id, int(expire_delta_time))384 return self385 def save(self, expire=None):386 if expire:387 self._data[self._session_id]['__expire_delta_time'] = int(expire)388 self._redis.hmset(self._session_id, self._data[self._session_id])389 self._redis.expire(self._session_id, int(expire))390 else:391 if int(self._expire) != 0:392 self._data[self._session_id][393 '__expire_delta_time'] = self._expire394 self._redis.hmset(self._session_id, self._data[395 self._session_id])396 self._redis.expire(self._session_id, self._expire)397 else:398 self._redis.hmset(self._session_id, self._data[399 self._session_id])400 def delete(self, session_id=None):401 if session_id:402 if session_id in self._data:403 del self._data[session_id]404 keys = self._redis.hkeys(session_id)405 if keys:406 self._redis.hdel(session_id, *keys)407 return session_id408 else:409 ssid = self._session_id410 del self._data[self._session_id]411 keys = self._redis.hkeys(self._session_id)412 if keys:413 self._redis.hdel(self._session_id, *keys)414 self._session_id = self._generate_session_id()415 self._data[self._session_id] = {}416 return ssid417 def __getitem__(self, key):418 return self.get(key)419 def __setitem__(self, key, value):420 self.set(key, value)421class AsyncSessionManager(object):422 def __init__(self, session_id=None, *, config=None, driver=None):423 pass424 def _get_redis_session_driver(self, config=None):425 pass426class SessionManager(object):427 _drivers = {}428 _default_driver = None...
session_manager.py
Source:session_manager.py
...8 def get_user_id_for_session_id(self, session_id):9 user_id = self.sessions.get(session_id)10 return user_id11 def generate_session_id_for_user(self, user_id):12 session_id = self._generate_session_id()13 # Ensure the session ID does not already exist.14 while session_id in self.sessions:15 session_id = self._generate_session_id()16 self.sessions[session_id] = user_id17 return session_id18 @staticmethod19 def _generate_session_id():...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!