Best Python code snippet using autotest_python
db_utils.py
Source:db_utils.py
...5from datetime import datetime6DATABASE_URI = os.environ.get('DATABASE_URI')7def find(space_id, collection_name, search_criteria, sort=None, user_id=None):8 if sort is None:9 data = get_collection(get_dbname(space_id), collection_name).find(declean_object(search_criteria))10 else:11 data = get_collection(get_dbname(space_id), collection_name).find(declean_object(search_criteria)).sort(sort)12 data = list(data)13 data = clean_array(data)14 return data15def upsert(space_id, collection_name, data, user_id=None):16 now = datetime.now()17 data['lastModifiedBy'] = user_id18 data['lastModifiedAt'] = now19 if data.get('id') is None and data.get('_id') is None:20 data['createdBy'] = user_id21 data['createdAt'] = now22 response = get_collection(get_dbname(space_id), collection_name).insert_one(data)23 record = get_collection(get_dbname(space_id), collection_name).find_one({'_id': response.inserted_id})24 return clean_object(record)25 else:26 if data.get('id') is None:27 data['_id'] = ObjectId(data.get('_id'))28 else:29 data['_id'] = ObjectId(data.get('id'))30 del data['id']31 updated_record = get_collection(get_dbname(space_id), collection_name).find_one_and_update(32 {'_id': data.get('_id')},33 {'$set': data},34 new=True35 )36 updated_record = clean_object(updated_record)37 return updated_record38def insert(space_id, collection_name, data, user_id=None):39 now = datetime.now()40 data['lastModifiedBy'] = user_id41 data['lastModifiedAt'] = now42 data['createdBy'] = user_id43 data['createdAt'] = now44 response = get_collection(get_dbname(space_id), collection_name).insert_one(data)45 record = get_collection(get_dbname(space_id), collection_name).find_one({'_id': response.inserted_id})46 return clean_object(record)47def delete(space_id, collection_name, search_criteria, user_id=None):48 search_criteria = declean_object(search_criteria)49 result = get_collection(get_dbname(space_id), collection_name).delete_many(search_criteria)50 return result51def clean_object(data):52 if data is not None and data.get('_id') is not None and type(data.get('_id')) == ObjectId:53 data['_id'] = str(data.get('_id'))54 return data55def clean_array(data):56 if data is not None and type(data) == list:57 for item in data:58 item = clean_object(item)59 return data60def declean_object(data):61 if data is not None and data.get('_id') is not None and type(data.get('_id')) != ObjectId and type(62 data.get('_id')) == str:63 data['_id'] = ObjectId(data.get('_id'))64 return data65def declean_array(data):66 if data is not None and type(data) == list:67 for item in data:68 item = clean_object(item)69 return data70def get_dbname(space_id):...
cleaner.py
Source:cleaner.py
1import logging2FORMAT = '%(asctime)-15s %(message)s'3logging.basicConfig(level=logging.DEBUG, format=FORMAT)4import re5logger = logging.getLogger(__name__)6def is_valid_user(dirty_object: dict):7 logger.info("Checking User")8 if "author" in dirty_object and "banned_users" in dirty_object:9 logger.info("Checking if author is banned users")10 banned_users = dirty_object['banned_users']11 if dirty_object['author'] in banned_users:12 logger.error("User banned")13 return dict({"result": False, "reason": 'User Banned'})14 logger.info("User is not banned")15 return dict({"result": True, "reason": 'User is not banned'})16 else:17 return dict({"result": True, "reason": 'No author field or banned_users'})18def is_valid_content(dirty_object: dict):19 logger.info("Checking content")20 if "banned_words" in dirty_object:21 for bannedWord in dirty_object['banned_words']:22 lookup_content = re.search(bannedWord, dirty_object['content'])23 if lookup_content:24 logger.error("Content banned")25 return dict({"result": False, "reason": "Banned content found"})26 return dict({"result": True, "reason": "Content is legit"})27def delete_project_fields(dirty_object: dict):28 for field in [29 "twitterUsers",30 "facebookUsers",31 "instagramUsers",32 "youtubeUsers",33 "banned_users",34 "banned_words",35 "keywords",36 "project_name",37 "project_slug",38 "createdAt",39 "index",40 "tags",41 "from_date",42 "to_date"43 ]:44 if field in dirty_object:45 dirty_object.__delitem__(field)46 return dirty_object47def clean_objects(dirty_object: dict):48 clean_object = dirty_object49 for field in ['content', 'text', 'title']:50 if field in clean_object:51 logger.info("Old field")52 logger.info(clean_object[field])53 clean_object[field] = re.sub('["\n", "\t", "\'", "\""]', ' ', dirty_object[field])54 logger.info("New field")55 logger.info(clean_object[field])...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!