Best Python code snippet using selenium-respectful_python
__init__.py
Source:__init__.py
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3import time4from flask.ext.sqlalchemy import SQLAlchemy5from .. import app, redis6from ..utils import init_wechat_sdk7from ..plugins.state import get_user_last_interact_time8db = SQLAlchemy(app)9from .auth import Auth10from .express import Express11from .sign import Sign12from .user import User13def set_user_info(openid):14 """ä¿åç¨æ·ä¿¡æ¯"""15 redis_prefix = "wechat:user:"16 cache = redis.hexists(redis_prefix + openid, 'nickname')17 if not cache:18 user_info = User.query.filter_by(openid=openid).first()19 if not user_info:20 try:21 wechat = init_wechat_sdk()22 user_info = wechat.get_user_info(openid)23 if 'nickname' not in user_info:24 raise KeyError(user_info)25 except Exception, e:26 app.logger.warning(u"è·å微信ç¨æ·ä¿¡æ¯ API åºé: %s" % e)27 user_info = None28 else:29 user = User(openid=user_info['openid'],30 nickname=user_info['nickname'],31 sex=user_info['sex'],32 province=user_info['province'],33 city=user_info['city'],34 country=user_info['country'],35 headimgurl=user_info['headimgurl'])36 user.save()37 # ä¸æ¥è¯¢çæ°æ®ç±»åä¸æ ·ï¼æ¹ä¾¿ redis åå
¥38 user_info = user39 if user_info:40 # åå
¥ç¼å41 redis.hmset(redis_prefix + user_info.openid, {42 "nickname": user_info.nickname,43 "realname": user_info.realname,44 "classname": user_info.classname,45 "sex": user_info.sex,46 "province": user_info.province,47 "city": user_info.city,48 "country": user_info.country,49 "headimgurl": user_info.headimgurl,50 "regtime": user_info.regtime51 })52 else:53 timeout = int(time.time()) - int(get_user_last_interact_time(openid))54 if timeout > 24 * 60 * 60:55 try:56 wechat = init_wechat_sdk()57 user_info = wechat.get_user_info(openid)58 if 'nickname' not in user_info:59 raise KeyError(user_info)60 except Exception, e:61 app.logger.warning(u"è·å微信ç¨æ·ä¿¡æ¯ API åºé: %s" % e)62 else:63 user = User.query.filter_by(openid=openid).first()64 user.nickname = user_info['nickname']65 user.sex = user_info['sex']66 user.province = user_info['province']67 user.city = user_info['city']68 user.country = user_info['country']69 user.headimgurl = user_info['headimgurl']70 user.update()71 redis.hmset(redis_prefix + openid, {72 "nickname": user_info['nickname'],73 "sex": user_info['sex'],74 "province": user_info['province'],75 "city": user_info['city'],76 "country": user_info['country'],77 "headimgurl": user_info['headimgurl']78 })79 return None80def is_user_exists(openid):81 """ç¨æ·æ¯å¦åå¨æ°æ®åº"""82 redis_prefix = "wechat:user:"83 cache = redis.exists(redis_prefix + openid)84 if not cache:85 user_info = User.query.filter_by(openid=openid).first()86 if not user_info:87 return False88 else:89 return True90 else:91 return True92def get_sign_info(openid):93 """读åç¾å°ä¿¡æ¯"""94 sign_info = Sign.query.filter_by(openid=openid).first()95 if not sign_info:96 return {97 "lastsigntime": 0,98 "keepdays": 0,99 "totaldays": 0100 }101 else:102 return {103 "lastsigntime": int(sign_info.lastsigntime),104 "keepdays": int(sign_info.keepdays),105 "totaldays": int(sign_info.totaldays)106 }107def update_sign_info(openid, lastsigntime, totaldays, keepdays):108 """æ´æ°ç¾å°ä¿¡æ¯"""109 # åå
¥æ°æ®åº110 sign_info = Sign.query.filter_by(openid=openid).first()111 if not sign_info:112 sign_info = Sign(openid, lastsigntime, totaldays, keepdays)113 sign_info.save()114 else:115 sign_info.lastsigntime = lastsigntime116 sign_info.totaldays = totaldays117 sign_info.keepdays = keepdays118 sign_info.update()119 return None120def get_today_sign_ranklist(today_timestamp):121 """è·åä»æ¥ç¾å°æè¡æ¦"""122 data = Sign.query.join(User, Sign.openid == User.openid) \123 .add_columns(User.nickname) \124 .filter(Sign.lastsigntime >= today_timestamp) \125 .order_by(Sign.lastsigntime).all()126 return data127def get_sign_keepdays_ranklist():128 """è·åç»ç¾æè¡æ¦"""129 data = Sign.query.join(User, Sign.openid == User.openid) \130 .add_columns(User.nickname) \131 .order_by(Sign.keepdays.desc(),132 Sign.totaldays.desc(),133 Sign.lastsigntime).limit(6).all()134 return data135def get_express_num(openid, num):136 """读åå¿«éä¿¡æ¯"""137 express_info = Express.query.filter_by(openid=openid, num=num).first()138 return express_info139def set_express_num(openid, num, com_code, lastupdate, ischeck):140 """åå
¥å¿«éä¿¡æ¯"""141 express_info = get_express_num(openid, num)142 if not express_info:143 express = Express(openid=openid,144 num=num,145 comcode=com_code,146 lastupdate=lastupdate,147 ischeck=ischeck)148 express.save()149 else:150 if express_info.lastupdate != lastupdate:151 express_info.lastupdate = lastupdate152 express_info.ischeck = ischeck153 express_info.update()154def get_all_uncheck_express():155 """读åæªç¾æ¶çå¿«éä¿¡æ¯"""156 express_info = Express.query.filter(Express.ischeck != 3).all()157 return express_info158def get_all_auth_info():159 """读åå
¨é¨ææè´¦å·ä¿¡æ¯"""160 auth_info = Auth.query.all()161 return auth_info162def get_user_student_info(openid):163 """读åç»å®çæå¡ç®¡çç³»ç»è´¦å·"""164 redis_prefix = "wechat:user:"165 user_info_cache = redis.hgetall(redis_prefix + openid)166 if 'studentid' in user_info_cache and 'studentpwd' in user_info_cache:167 return user_info_cache168 else:169 auth_info = Auth.query.filter_by(openid=openid).first()170 if auth_info and auth_info.studentid and auth_info.studentpwd:171 # åå
¥ç¼å172 redis.hmset(redis_prefix + openid, {173 "studentid": auth_info.studentid,174 "studentpwd": auth_info.studentpwd,175 })176 user_info_cache['studentid'] = auth_info.studentid177 user_info_cache['studentpwd'] = auth_info.studentpwd178 return user_info_cache179 else:180 return False181def get_user_library_info(openid):182 """读åç»å®çå¾ä¹¦é¦è´¦å·"""183 redis_prefix = "wechat:user:"184 user_info_cache = redis.hgetall(redis_prefix + openid)185 if 'libraryid' in user_info_cache and 'librarypwd' in user_info_cache:186 return user_info_cache187 else:188 auth_info = Auth.query.filter_by(openid=openid).first()189 if auth_info and auth_info.libraryid and auth_info.librarypwd:190 # åå
¥ç¼å191 redis.hmset(redis_prefix + openid, {192 "libraryid": auth_info.libraryid,193 "librarypwd": auth_info.librarypwd,194 })195 user_info_cache['libraryid'] = auth_info.libraryid196 user_info_cache['librarypwd'] = auth_info.librarypwd197 return user_info_cache198 else:199 return False200def set_user_student_info(openid, studentid, studentpwd):201 """åå
¥ç»å®çæå¡ç®¡çç³»ç»è´¦å·"""202 redis_prefix = "wechat:user:"203 auth_info = Auth.query.filter_by(openid=openid).first()204 if not auth_info:205 auth = Auth(openid=openid,206 studentid=studentid,207 studentpwd=studentpwd)208 auth.save()209 else:210 auth_info.studentid = studentid211 auth_info.studentpwd = studentpwd212 auth_info.update()213 # åå
¥ç¼å214 redis.hmset(redis_prefix + openid, {215 "studentid": studentid,216 "studentpwd": studentpwd217 })218def set_user_library_info(openid, libraryid, librarypwd):219 """åå
¥ç»å®çå书å¡è´¦å·"""220 redis_prefix = "wechat:user:"221 auth_info = Auth.query.filter_by(openid=openid).first()222 if not auth_info:223 auth = Auth(openid=openid,224 libraryid=libraryid,225 librarypwd=librarypwd)226 auth.save()227 else:228 auth_info.libraryid = libraryid229 auth_info.librarypwd = librarypwd230 auth_info.update()231 # åå
¥ç¼å232 redis.hmset(redis_prefix + openid, {233 "libraryid": libraryid,234 "librarypwd": librarypwd235 })236def set_user_realname_and_classname(openid, realname, classname):237 """åå
¥ç¨æ·ççå®å§ååç级"""238 redis_prefix = "wechat:user:"239 cache = redis.hgetall(redis_prefix + openid)240 realname_exists = redis.hexists(redis_prefix + openid, 'realname')241 if not realname_exists or cache['realname'] == 'None':242 user_info = User.query.filter_by(openid=openid).first()243 if user_info and not user_info.realname:244 user_info.realname = realname245 user_info.classname = classname246 user_info.update()247 # åå
¥ç¼å248 redis.hmset(redis_prefix + openid, {249 "realname": realname,250 "classname": classname...
BatchGenerator.py
Source:BatchGenerator.py
1import numpy as np2import random as rd3import redis4class BatchGenerator():5 def __init__(self, batch_size, validation_frac=0.05, test_frac=0.05):6 self.redis_prefix = "deepmm_"7 self.batch_size = int(batch_size)8 self.r = redis.Redis(decode_responses=True, 9 host='localhost', 10 port=6379, db=0)11 self.redis_clean()12 self.redis_load_dimensions()13 self.redis_create_dictionary()14 self.redis_split_train_val_test(validation_frac, test_frac)15 def redis_load_dimensions(self):16 self.m = int(self.r.scard(self.redis_prefix + "available_data"))17 self.T_x = int(self.r.get(self.redis_prefix + "T_x"))18 self.T_y = int(self.r.get(self.redis_prefix + "T_y"))19 self.n_x = int(self.r.get(self.redis_prefix + "n_x"))20 self.n_y = int(self.r.get(self.redis_prefix + "n_y"))21 def encode_inst(self, inst):22 return self.dict[inst]23 def decode_inst(self, one_hot):24 try:25 return self.reverse_dict[np.argmax(one_hot)]26 except KeyError as e:27 raise KeyError("Key error: " + str(one_hot)) from e28 def redis_create_dictionary(self):29 vals = self.r.zrangebyscore(30 self.redis_prefix + "dict",31 "-inf", "+inf")32 idm = np.identity(len(vals))33 self.dict = {}34 self.reverse_dict = {}35 for i in range(len(vals)):36 self.dict[vals[i]] = idm[i]37 self.reverse_dict[i] = vals[i]38 self.n_y = len(vals)39 def redis_split_train_val_test(self, validation_frac, test_frac):40 train_frac = 1 - validation_frac - test_frac41 validation = self.r.spop(self.redis_prefix + "available_data", 42 int(self.m * validation_frac))43 self.r.sadd(self.redis_prefix + "available_validation_data", 44 *validation)45 test = self.r.spop(self.redis_prefix + "available_data", 46 int(self.m * test_frac))47 self.r.sadd(self.redis_prefix + "available_test_data", 48 *test)49 self.r.rename(self.redis_prefix + "available_data", 50 self.redis_prefix + "available_train_data")51 self.m_train = int(self.r.scard(self.redis_prefix + "available_train_data"))52 self.m_validation = int(self.r.scard(self.redis_prefix + "available_validation_data"))53 self.m_test = int(self.r.scard(self.redis_prefix + "available_test_data"))54 def redis_reset_dataset(self, dataset):55 used_key = self.redis_prefix + "used_" + dataset + "_data"56 available_key = self.redis_prefix + "available_" + dataset + "_data"57 self.r.sunionstore(available_key, available_key, used_key)58 self.r.delete(used_key)59 def redis_clean(self):60 keys = [self.redis_prefix + "available_data", 61 self.redis_prefix + "available_train_data",62 self.redis_prefix + "used_train_data",63 self.redis_prefix + "available_validation_data",64 self.redis_prefix + "used_validation_data",65 self.redis_prefix + "available_test_data",66 self.redis_prefix + "used_test_data"]67 self.r.sunionstore(self.redis_prefix + "available_data", *keys)68 for k in keys[1:]:69 self.r.delete(k)70 def prepare_batch(self, dataset):71 if (dataset not in ["train", "validation", "test"]):72 raise ValueError("`dataset` should be train, validation or test")73 else:74 redis_key_available = self.redis_prefix + "available_" + dataset + "_data"75 redis_key_used = self.redis_prefix + "used_" + dataset + "_data"76 vals = self.r.spop(redis_key_available, self.batch_size)77 XY_str = [[vv.split(";") for vv in v.split("-")] for v in vals]78 X = np.zeros((self.batch_size, self.T_x, self.n_x), dtype=np.float32)79 X_decoder = np.zeros((self.batch_size, self.T_y, self.n_y), dtype=np.float32)80 Y = np.zeros((self.batch_size, self.T_y, self.n_y), dtype=np.float32)81 for seq_idx in range(len(XY_str)):82 for x_idx in range(len(XY_str[seq_idx][0])):83 X[seq_idx,x_idx] = np.array(list(XY_str[seq_idx][0][x_idx]), dtype=np.float32)84 for y_idx in range(len(XY_str[seq_idx][1])):85 X_decoder[seq_idx,y_idx] = self.encode_inst(XY_str[seq_idx][1][y_idx])86 87 self.r.sadd(redis_key_used, *vals)88 Y = np.roll(X_decoder, -1, 1)89 one_hot_end = self.encode_inst("<END>")90 for i in range(self.batch_size):91 Y[i,-1,:] = one_hot_end92 return [X, X_decoder], Y93 def generator_train(self):94 while (True):95 for i in range(self.m_train // self.batch_size):96 yield self.prepare_batch("train") 97 self.redis_reset_dataset("train")98 def generator_validation(self):99 while (True):100 for i in range(self.m_validation // self.batch_size):101 yield self.prepare_batch("validation") 102 self.redis_reset_dataset("validation") 103 def generator_test(self):104 for i in range(self.m_test // self.batch_size):105 yield self.prepare_batch("test") ...
token_bb_redis.py
Source:token_bb_redis.py
1from typing import List2from app.db.repositories.base_redis import BaseRedisRepository3from aioredis import Redis4from app.schemas.token_bb import TokenBB5from app.core.config import REDIS_PREFIX6class TokenBBRedisRepository(BaseRedisRepository):7 def __init__(self, redis: Redis) -> None:8 super().__init__(redis)9 async def set_token(self, *, token: TokenBB, expires_in: int):10 """11 Set token hash fields to multiple values.12 :param token:13 """14 await self._redis.hmset(f"{REDIS_PREFIX}:{token.id}", token.dict())15 await self._redis.expire(f"{REDIS_PREFIX}:{token.id}", expires_in) # need seconds16 async def len(self, key: str):17 """18 Get the number of fields in a given hash.19 :param key:20 :return: int:21 """22 return await self._redis.hlen(f"{REDIS_PREFIX}:{key}")23 async def get_all(self, key: str):24 """25 Get all the fields and values in a hash.26 :param key:27 :return: dict:28 """29 return await self._redis.hgetall(f"{REDIS_PREFIX}:{key}")30 async def get_token_by_id(self, id: str) -> List[TokenBB]:31 data = await self._redis.hgetall(f"{REDIS_PREFIX}:{id}")32 if not data:33 return None34 token = TokenBB(**data)35 return token36 async def remove_token_by_key(self, key: str):37 await self._redis.delete(f"{REDIS_PREFIX}:{key}")38 async def expire_token_by_key(self, key: str, seconds: int = 60): # 1min39 await self._redis.expire(f"{REDIS_PREFIX}:{key}", seconds)40 async def remove_tokens_by_id(self, id: str):41 tokens: List[TokenBB] = await self.get_tokens_by_id(id=id)42 for token in tokens:43 await self.remove_token_by_key(token.id)44 async def expire_tokens_by_id(self, id: str):45 tokens: List[TokenBB] = await self.get_tokens_by_id(id=id)46 for token in tokens:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!