Best JavaScript code snippet using ng-mocks
bscMtdCore.py
Source:bscMtdCore.py
1# coding:utf-82from . import bscCfg345class Mtd_BscBasic(bscCfg.BscUtility):6 pass789class Mtd_BscPython(Mtd_BscBasic):10 @classmethod11 def _bsc_mtd__set_python_module_load_(cls, moduleName):12 loader = cls.MOD_pkgutil.find_loader(moduleName)13 if loader:14 return cls.MOD_importlib.import_module(moduleName)1516 @classmethod17 def _bsc_mtd__set_python_module_reload_(cls, moduleName):18 loader = cls.MOD_pkgutil.find_loader(moduleName)19 if loader:20 return cls.MOD_importlib.import_module(moduleName)212223class Mtd_BscUtility(Mtd_BscBasic):2425 @classmethod26 def _getSystemUsername(cls):27 return cls.MOD_getpass.getuser()2829 @classmethod30 def _getSystemHostname(cls):31 return cls.MOD_socket.getfqdn(cls.MOD_socket.gethostname())3233 @classmethod34 def _getSystemHost(cls):35 return cls.MOD_socket.gethostbyname(cls.MOD_socket.gethostname())3637 @classmethod38 def _bsc_mtd__os_path__set_directory_create_(cls, directoryString):39 if cls.MTD_os_path.exists(directoryString) is False:40 cls.MOD_os.makedirs(directoryString)41 return True42 return False4344 @classmethod45 def _bsc_mtd__os_path__set_file_directory_create_(cls, fileString):46 directoryString = cls.MTD_os_path.dirname(fileString)47 cls._bsc_mtd__os_path__set_directory_create_(directoryString)4849 @classmethod50 def _getSystemActiveTimestamp(cls):51 return cls.MOD_time.time()5253 @classmethod54 def _timestampToDatetag(cls, timestamp):55 return cls.MOD_time.strftime('%Y_%m%d', cls.MOD_time.localtime(timestamp))5657 @classmethod58 def _getActiveDatetag(cls):59 return cls._timestampToDatetag(cls._getSystemActiveTimestamp())6061 @classmethod62 def _timestamp2timetag(cls, timestamp):63 return cls.MOD_time.strftime(64 cls.DEF_time_tag_format,65 cls.MOD_time.localtime(timestamp)66 )6768 @classmethod69 def _getActiveTimetag(cls):70 return cls._timestamp2timetag(cls._getSystemActiveTimestamp())7172 @classmethod73 def _timestampToPrettify(cls, timestamp):74 return cls.MOD_time.strftime(75 cls.DEF_time_prettify_format,76 cls.MOD_time.localtime(timestamp)77 )7879 @classmethod80 def _getActivePrettifyTime(cls):81 return cls._timestampToPrettify(cls._getSystemActiveTimestamp())8283 @classmethod84 def _string2list(cls, string, includes=None):85 lis = []86 if isinstance(string, (str, unicode)):87 if includes:88 if string in includes:89 lis = [string]90 else:91 lis = [string]92 elif isinstance(string, (tuple, list)):93 for i in string:94 if includes:95 if i in includes:96 lis.append(i)97 else:98 lis.append(i)99 return lis100 101 @classmethod102 def _isDevelop(cls):103 return [False, True][cls._getOsEnvironRawWithKey(cls.DEF_util__environ_key__enable_develop, 'FALSE').lower() == 'true']104105 @classmethod106 def _isTraceEnable(cls):107 return [False, True][cls._getOsEnvironRawWithKey(cls.DEF_util__environ_key__enable_trace, 'FALSE').lower() == 'true']108109 @classmethod110 def _getOsEnvironRawWithKey(cls, key, failobj=None):111 return cls.MOD_os.environ.get(key, failobj)112113 @classmethod114 def _getOsEnvironRawAsPath(cls, key, failobj=None):115 if key in cls.MOD_os.environ:116 return cls._getOsEnvironRawWithKey(key).replace('\\', cls.DEF_bsc__pathsep)117 elif failobj is not None:118 return failobj119 return ''120121 @classmethod122 def _getOsEnvironRawAsList(cls, key, failobj=None):123 if key in cls.MOD_os.environ:124 return cls._getOsEnvironRawWithKey(key).split(cls.MOD_os.pathsep)125 elif failobj is not None:126 return failobj127 return []128129 @classmethod130 def _getOsEnvironRawAsPathList(cls, key, failobj=None):131 if key in cls.MOD_os.environ:132 return [133 i.replace('\\', cls.DEF_bsc__pathsep)134 for i in135 cls._getOsEnvironRawWithKey(key).split(cls.MOD_os.pathsep)136 ]137 elif failobj is not None:138 return failobj139 return []140141 @classmethod142 def _osPathToPythonStyle(cls, pathStr):143 return pathStr.replace('\\', cls.DEF_bsc__pathsep)144145 @classmethod146 def _isOsDirectory(cls, pathStr):147 return cls.MTD_os_path.isdir(pathStr)148149 @classmethod150 def _isOsFile(cls, pathStr):151 if pathStr is not None:152 return cls.MTD_os_path.isfile(pathStr)153 return False154155 @classmethod156 def _isOsSameFile(cls, sourceFileString, targetFileString):157 return cls.MTD_os_path.normpath(sourceFileString) == cls.MTD_os_path.normpath(targetFileString)158159 @classmethod160 def _getOsFileBase(cls, fileString):161 return cls.MTD_os_path.splitext(fileString)[0]162 163 @classmethod164 def _getOsFileName(cls, fileString):165 return cls.MTD_os_path.splitext(cls.MTD_os_path.basename(fileString))[0]166 167 @classmethod168 def _getOsFileDirname(cls, fileString):169 return cls.MTD_os_path.dirname(fileString)170171 @classmethod172 def _getOsFileBasename(cls, fileString):173 return cls.MTD_os_path.basename(fileString)174 175 @classmethod176 def _getOsFileExt(cls, fileString):177 return cls.MTD_os_path.splitext(fileString)[1]178 179 @classmethod180 def _toOsFileStringReplaceFileName(cls, fileString, newFileBasenameString):181 osPath = cls._getOsFileDirname(fileString)182 osExt = cls._getOsFileExt(fileString)183 newFileString = u'{0}/{1}{2}'.format(osPath, newFileBasenameString, osExt)184 return newFileString185186 @classmethod187 def _isOsPathExist(cls, pathStr):188 return cls.MTD_os_path.exists(pathStr)189190 @classmethod191 def _isOsDirectoryExist(cls, directoryString):192 return cls.MTD_os_path.isdir(directoryString)193194 @classmethod195 def _isOsFileExist(cls, fileString):196 return cls.MTD_os_path.isfile(fileString)197198 @classmethod199 def _setOsPathOpen(cls, pathStr):200 if cls._isOsPathExist(pathStr) is True:201 cls.MOD_os.startfile(202 pathStr.replace('/', cls.MOD_os.sep)203 )204205 @classmethod206 def _setOsFileOpen(cls, pathStr):207 if cls._isOsFileExist(pathStr) is True:208 cls.MOD_os.startfile(209 pathStr.replace('/', cls.MOD_os.sep)210 )211212 @classmethod213 def _setOsDirectoryOpen(cls, pathStr):214 if cls._isOsPathExist(pathStr) is True:215 cls.MOD_os.startfile(216 pathStr.replace('/', cls.MOD_os.sep)217 )218219 @classmethod220 def _getOsFileMtimestamp(cls, fileString):221 if cls._isOsFileExist(fileString):222 return cls.MOD_os.stat(fileString).st_mtime223224 @classmethod225 def _getOsFileSize(cls, fileString):226 if cls._isOsFileExist(fileString):227 return cls.MTD_os_path.getsize(fileString)228 return 0229230 @classmethod231 def _isAbsOsPath(cls, pathStr):232 return cls.MTD_os_path.isabs(pathStr)233 234 @classmethod235 def _isOsFileTimeChanged(cls, sourceFileString, targetFileString):236 if cls._isOsFileExist(sourceFileString) and cls._isOsFileExist(targetFileString):237 if str(cls._getOsFileMtimestamp(sourceFileString)) != str(cls._getOsFileMtimestamp(targetFileString)):238 return True239 return False240 return False241242 @classmethod243 def _stringToHash(cls, text):244 md5Obj = cls.MOD_hashlib.md5()245 md5Obj.update(text)246 return str(md5Obj.hexdigest()).upper()247248 @classmethod249 def _getOsFileHash(cls, fileString):250 if cls._isOsFileExist(fileString):251 with open(fileString, u'rb') as f:252 raw = f.read()253 f.close()254 if raw:255 return cls._stringToHash(raw)256 return u'D41D8CD98F00B204E9800998ECF8427E'257258 @classmethod259 def _getOsFileHash_(cls, fileString):260 if cls._isOsFileExist(fileString):261 with open(fileString, u'rb') as f:262 md5Obj = cls.MOD_hashlib.md5()263 while True:264 d = f.read(8096)265 if not d:266 break267 md5Obj.update(d)268269 f.close()270 return str(md5Obj.hexdigest()).upper()271 return u'D41D8CD98F00B204E9800998ECF8427E'272273 @classmethod274 def _isOsFileHashChanged(cls, sourceFileString, targetFileString):275 if cls._isOsFileExist(sourceFileString) and cls._isOsFileExist(targetFileString):276 if cls._getOsFileHash(sourceFileString) != cls._getOsFileHash(targetFileString):277 return True278 return False279 return False280281 @classmethod282 def _setOsFileRename(cls, fileString, newFileBasenameString):283 if cls._isOsFileExist(fileString):284 newFileString = cls._toOsFileStringReplaceFileName(fileString, newFileBasenameString)285 if cls._isOsSameFile(fileString, newFileString) is False:286 cls.MOD_os.rename(fileString, newFileString)287 288 @classmethod289 def _setOsFileRename_(cls, fileString, newFileString):290 if cls._isOsSameFile(fileString, newFileString) is False:291 cls.MOD_os.rename(fileString, newFileString)292293 @classmethod294 def _setOsFileCopy(cls, sourceFileString, targetFileString, force=True):295 cls._bsc_mtd__os_path__set_file_directory_create_(targetFileString)296 # Check Same File297 if not cls._isOsSameFile(sourceFileString, targetFileString):298 if force is True:299 cls.MOD_shutil.copy2(sourceFileString, targetFileString)300 elif force is False:301 try:302 cls.MOD_shutil.copy2(sourceFileString, targetFileString)303 except IOError:304 print sourceFileString, targetFileString305306 @classmethod307 def _setOsPathRemove(cls, pathStr):308 if cls.MTD_os_path.isfile(pathStr):309 cls.MOD_os.remove(pathStr)310 elif cls.MTD_os_path.isdir(pathStr):311 cls.MOD_os.removedirs(pathStr)312313 @classmethod314 def _setOsFileMove_(cls, fileString, targetPathString):315 basename = cls._getOsFileBasename(fileString)316 targetFileString = cls._toOsFilename(targetPathString, basename)317 cls._setOsFileMove(fileString, targetFileString)318319 @classmethod320 def _setOsFileMove(cls, fileString, targetFileString):321 if cls.MTD_os_path.isfile(fileString):322 cls._bsc_mtd__os_path__set_file_directory_create_(targetFileString)323 cls.MOD_shutil.move(fileString, targetFileString)324325 @classmethod326 def setOsDirectoryHide(cls, directoryString):327 if cls._isOsDirectoryExist(directoryString):328 if u'Windows' in cls.MOD_platform.system():329 command = u'attrib +h "' + directoryString + u'"'330 command = command.encode(cls.MOD_locale.getdefaultlocale()[1])331 cls.MOD_os.popen(command).close()332333 @classmethod334 def _osPathString2RelativeName(cls, rootString, fullpathName):335 return fullpathName[len(rootString) + 1:]336 337 @classmethod338 def _toOsFilename(cls, directoryString, basenameString):339 return cls.MTD_os_path.join(directoryString, basenameString).replace('\\', cls.DEF_bsc__pathsep)340341 @classmethod342 def _getPathnameListByOsDirectory(cls, rootString, extString, isFile, isFullpath, isAll):343 def extFilterFnc_(fullpathName_):344 if filterExtStringLis is not None:345 for i in filterExtStringLis:346 if fullpathName_.endswith(i):347 return True348 return False349 return True350351 def addFnc_(fullpathName_):352 if extFilterFnc_(fullpathName_) is True:353 if isFullpath is True:354 lis.append(fullpathName_)355 else:356 relativeName = cls._osPathString2RelativeName(rootString, fullpathName_)357 lis.append(relativeName)358359 def recursionFnc_(directoryString_):360 children = cls.MOD_os.listdir(directoryString_)361 if children:362 for i in children:363 fullpathName = cls._toOsFilename(directoryString_, i)364 if cls.MTD_os_path.isfile(fullpathName):365 addFnc_(fullpathName)366 else:367 if isFile is False:368 addFnc_(fullpathName)369370 if isAll is True:371 recursionFnc_(fullpathName)372373 lis = []374375 if extString is not None:376 filterExtStringLis = cls._string2list(extString)377 else:378 filterExtStringLis = None379380 if cls.MTD_os_path.exists(rootString):381 recursionFnc_(rootString)382383 return lis384385 @classmethod386 def _getOsFileTemporaryName(cls, fileString, timetag=None):387 if timetag is None:388 timetag = cls._getActiveTimetag()389390 temporaryDirectory = u'{}/{}'.format(cls.DEF_path_temporary_local, timetag)391392 temporaryFileString = cls._toOsFilename(temporaryDirectory, cls._getOsFileBasename(fileString))393 cls._bsc_mtd__os_path__set_directory_create_(temporaryDirectory)394 return temporaryFileString395396 @classmethod397 def _toOsFileJoinTimetag(cls, fileString, timetag=None, useMode=0):398 if timetag is None:399 timetag = cls._getActiveTimetag()400401 if useMode == 0:402 return (u'.{}'.format(timetag)).join(cls.MTD_os_path.splitext(fileString))403 elif useMode == 1:404 return u'{}/{}/{}'.format(cls._getOsFileDirname(fileString), timetag, cls._getOsFileBasename(fileString))405 return fileString406407 @classmethod408 def _setOsFileBackup(cls, fileString, backupFileString, timetag=None, useMode=0):409 backupFileString_ = cls._toOsFileJoinTimetag(backupFileString, timetag, useMode)410 cls._setOsFileCopy(fileString, backupFileString_)411412 @classmethod413 def _getOsFileMtimetag(cls, fileString):414 return cls._timestamp2timetag(cls._getOsFileMtimestamp(fileString))415416 @classmethod417 def _toOsFileInfoJsonFileString(cls, fileString):418 base = cls._getOsFileBase(fileString)419 return base + u'.info.json'420421 @classmethod422 def _infoDict(cls, fileString):423 return {424 cls.DEF_key_source: fileString,425 cls.DEF_key_timestamp: cls._getSystemActiveTimestamp(),426 cls.DEF_key_username: cls._getSystemUsername(),427 cls.DEF_key_hostname: cls._getSystemHostname(),428 cls.DEF_key_host: cls._getSystemHost()429 }430431 @classmethod432 def _toOsFileResultFileString(cls, fileString):433 base = cls._getOsFileBase(fileString)434 return base + u'.result.log'435436 @classmethod437 def _getDevelopRoot(cls):438 return cls._getOsEnvironRawAsPath(439 cls.DEF_util__environ_key__path_develop,440 cls.DEF_util__root__default_develop441 )442443 @classmethod444 def _getProductRoot(cls):445 return cls._getOsEnvironRawAsPath(cls.DEF_util__environ_key__path_product, cls.DEF_util__root__default_product)446447 @classmethod448 def _getPresetRoot(cls):449 return cls._getOsEnvironRawAsPath(cls.DEF_util__environ_key__path_preset, cls._getProductRoot())450451 @classmethod452 def _getToolkitRoot(cls):453 return cls._getOsEnvironRawAsPath(cls.DEF_util__environ_key__path_toolkit, cls._getProductRoot())454455 @classmethod456 def _getServerPath(cls):457 if cls._isDevelop():458 return cls._getDevelopRoot()459 return cls._getProductRoot()460461 @classmethod462 def _toPathString(cls, separator, *args):463 if isinstance(args[0], (list, tuple)):464 pathStringLis = args[0]465 else:466 pathStringLis = list(args)467468 string = ''469 index = 0470 for i in pathStringLis:471 if i not in ['', None]:472 if index is 0:473 string = i474 else:475 string += u'{}{}'.format(separator, i)476 index += 1477 return string478479 @classmethod480 def _toOsPathString(cls, *args):481 return cls._toPathString(cls.DEF_bsc__pathsep, *args).replace('\\', cls.DEF_bsc__pathsep)482483 @classmethod484 def _bsc_mtd__set_python_module_load_(cls, moduleName):485 loader = cls.MOD_pkgutil.find_loader(moduleName)486 if loader:487 return cls.MOD_importlib.import_module(moduleName)488489 @classmethod490 def _bsc_mtd__set_python_module_reload_(cls, moduleName):491 loader = cls.MOD_pkgutil.find_loader(moduleName)492 if loader:493 return cls.MOD_importlib.import_module(moduleName)494495 @classmethod496 def _toHtmlLogFileString(cls, fileString):497 base = cls._getOsFileBase(fileString)498 return u'{}.log.html'.format(base)499500 @classmethod501 def _getQtProgressBar(cls, title, maxValue):502 module = cls._bsc_mtd__set_python_module_load_(u'LxGui.qt.qtCommands')503 if module is not None:504 return module.setProgressWindowShow(title, maxValue)505506 @classmethod507 def _setQtProgressBarUpdate(cls, progressBar, text=None):508 if progressBar is not None:509 progressBar.updateProgress(text)510511 @classmethod512 def _timetagToChnPrettify(cls, timetag, useMode=0):513 if timetag:514 if cls._getOsFileTimetag(timetag) is not None:515 year = int(timetag[:4])516 month = int(timetag[5:7])517 date = int(timetag[7:9])518 hour = int(timetag[10:12])519 minute = int(timetag[12:14])520 second = int(timetag[15:16])521 if year > 0:522 timetuple = cls.MOD_datetime.datetime(year=year, month=month, day=date, hour=hour, minute=minute, second=second).timetuple()523 return cls._timetupleToChnPrettify(timetuple, useMode)524 return u'{0}{0}å¹´{0}æ{0}æ¥{0}ç¹å'.format('??')525 return u'æ è®°å½'526527 @classmethod528 def _getOsFileTimetag(cls, backupFileString):529 lis = cls.MOD_re.findall(cls.DEF_time_tag_search_string, backupFileString)530 if lis:531 return lis[0]532533 @classmethod534 def _getOsFileBackupNameDict(cls, fileString):535 dic = {}536 if fileString:537 directoryName = cls._getOsFileDirname(fileString)538 if cls._isOsDirectoryExist(directoryName):539 backupName = cls._toOsFileJoinTimetag(fileString, cls.DEF_time_tag_search_string)540 stringLis = cls.MOD_glob.glob(backupName)541 if stringLis:542 for i in stringLis:543 dic[cls._getOsFileTimetag(i)] = i.replace('\\', cls.DEF_bsc__pathsep)544 return dic545546 @classmethod547 def _timestampToChnPrettify(cls, timestamp, useMode=0):548 if isinstance(timestamp, float):549 return cls._timetupleToChnPrettify(cls.MOD_time.localtime(timestamp), useMode)550 else:551 return u'æ è®°å½'552553 @classmethod554 def _timetupleToChnPrettify(cls, timetuple, useMode=0):555 year, month, date, hour, minute, second, week, dayCount, isDst = timetuple556 if useMode == 0:557 timetuple_ = cls.MOD_time.localtime(cls.MOD_time.time())558 year_, month_, date_, hour_, minute_, second_, week_, dayCount_, isDst_ = timetuple_559 #560 monday = date - week561 monday_ = date_ - week_562 if timetuple_[:1] == timetuple[:1]:563 dateString = u'{}æ{}æ¥'.format(str(month).zfill(2), str(date).zfill(2))564 weekString = u''565 subString = u''566 if timetuple_[:2] == timetuple[:2]:567 if monday_ == monday:568 dateString = ''569 weekString = u'{0}'.format(cls.DEF_time_week[int(week)][0])570 if date_ == date:571 subString = u'ï¼ä»å¤©ï¼'572 elif date_ == date + 1:573 subString = u'ï¼æ¨å¤©ï¼'574 #575 timeString = u'{}ç¹{}å'.format(str(hour).zfill(2), str(minute).zfill(2), str(second).zfill(2))576 #577 string = u'{}{}{} {}'.format(dateString, weekString, subString, timeString)578 return string579 else:580 return u'{}å¹´{}æ{}æ¥'.format(str(year).zfill(4), str(month).zfill(2), str(date).zfill(2))581 else:582 dateString = u'{}å¹´{}æ{}æ¥'.format(str(year).zfill(4), str(month).zfill(2), str(date).zfill(2))583 timeString = u'{}ç¹{}å{}ç§'.format(str(hour).zfill(2), str(minute).zfill(2), str(second).zfill(2))584 return u'{} {}'.format(dateString, timeString)585586 # Log587 @classmethod588 def _logDirectory(cls):589 return u'{}/.log'.format(cls._getServerPath())590591 @classmethod592 def _exceptionLogFile(cls):593 return u'{}/{}.exception.log'.format(594 cls._logDirectory(), cls._getActiveDatetag()595 )596597 @classmethod598 def _errorLogFile(cls):599 return u'{}/{}.error.log'.format(600 cls._logDirectory(), cls._getActiveDatetag()601 )602603 @classmethod604 def _resultLogFile(cls):605 return u'{}/{}.result.log'.format(606 cls._logDirectory(), cls._getActiveDatetag()607 )608609 @classmethod610 def _databaseLogFile(cls):611 return u'{}/{}.database.log'.format(612 cls._logDirectory(), cls._getActiveDatetag()613 )614615 @classmethod616 def _basicUniqueId(cls):617 return '4908BDB4-911F-3DCE-904E-96E4792E75F1'618619 @classmethod620 def _getUniqueId(cls):621 return str(cls.MOD_uuid.uuid1()).upper()622623 @classmethod624 def _stringToUniqueId(cls, string=None):625 if string is not None:626 basicUuid = cls._basicUniqueId()627 return str(cls.MOD_uuid.uuid3(cls.MOD_uuid.UUID(basicUuid), str(string))).upper()628 return cls._getUniqueId()629630 @classmethod631 def _stringsToUniqueId(cls, *args):632 def toOrderCodeString_(strings_):633 return ''.join([str(ord(i) + seq).zfill(4) for seq, i in enumerate(strings_)])634635 if len(args) > 1:636 strings = list(args)637 else:638 strings = cls._string2list(args[0])639 #640 subCode = toOrderCodeString_(strings)641 return cls._stringToUniqueId(subCode)642643 @classmethod644 def _setOsJsonWrite(cls, fileString, raw, indent=4, ensure_ascii=True):645 temporaryName = cls._getOsFileTemporaryName(fileString)646 with open(temporaryName, u'w') as j:647 cls.MOD_json.dump(648 raw,649 j,650 indent=indent,651 ensure_ascii=ensure_ascii652 )653654 cls._setOsFileCopy(temporaryName, fileString)655656 @classmethod657 def _setAddMessage(cls, text):658 print u' |{}'.format(cls._getActivePrettifyTime())659 print u'{}'.format(text)660661 @classmethod662 def _setAddResult(cls, text):663 cls._setAddMessage(664 u''' result |{}'''.format(text)665 )666667 @classmethod668 def _setAddWarning(cls, text):669 cls._setAddMessage(670 u'''warning |{}'''.format(text)671 )672673 @classmethod674 def _setAddError(cls, text):675 cls._setAddMessage(676 u''' error |{}'''.format(text)677 )678679680class Mtd_BscPath(Mtd_BscUtility):681 @classmethod682 def _toDagpathRemapList(cls, pathStr, pathsep):683 def addFnc_(lis_, item_):684 if not item_ in lis_:685 lis_.append(item_)686 #687 def getBranchFnc_(lis_, pathString_):688 if not pathString_ in lis:689 _strList = pathString_.split(pathsep)690 #691 _strCount = len(_strList)692 for _seq, _s in enumerate(_strList):693 if _s:694 if (_seq + 1) < _strCount:695 subPath = pathsep.join(_strList[:_seq + 1])696 addFnc_(lis_, subPath)697 #698 addFnc_(lis_, pathString_)699 #700 lis = []701 pathStrList = cls._string2list(pathStr)702 for i in pathStrList:703 # Debug add root704 if not i.startswith(pathsep):705 i = pathsep + i706 #707 getBranchFnc_(lis, i)708 return lis709710 @classmethod711 def _getDagpathRemapDict(cls, pathStr, pathsep):712 def addFnc_(item):713 if not item in lis:714 lis.append(item)715 #716 def getBranchFnc_(pathString_, pathDatumList):717 parent = pathDatumList[-2]718 parentPathString_ = pathsep.join(pathDatumList[:-1])719 nameString_ = pathDatumList[-1]720 addFnc_(((parent, parentPathString_), (nameString_, pathString_)))721 #722 def getRootFnc_(pathString_, pathDatumList):723 nameString_ = pathDatumList[-1]724 addFnc_(725 ((None, None), (nameString_, pathString_))726 )727 #728 def getMainFnc_():729 # Get Dict730 pathStringLis = cls._string2list(pathStr)731 if pathStringLis:732 for i in pathStringLis:733 pathDatumList = i.split(pathsep)734 isRoot = len(pathDatumList) == 2735 # Filter is Root736 if isRoot:737 getRootFnc_(i, pathDatumList)738 else:739 getBranchFnc_(i, pathDatumList)740 # Reduce Dict741 if lis:742 list2dictFnc_(dic, lis)743744 def list2dictFnc_(dic_, lis_):745 [dic_.setdefault(p, []).append(c) for p, c in lis_]746 #747 lis = []748 dic = cls.CLS_dic_order()749 #750 getMainFnc_()751 return dic752753 @classmethod754 def _setDicConvertToPathCreateDic(cls, dic, nodesep):755 def getBranchFnc_(parent):756 if parent in dic:757 parentPathString_ = parent758 if parent in dic_:759 parentPathString_ = dic_[parent]760 #761 children = dic[parent]762 if children:763 for child in children:764 childPath = parentPathString_ + pathsep + child765 dic_[child] = childPath766 getBranchFnc_(child)767768 pathsep = nodesep769 #770 dic_ = cls.CLS_dic_order()771 root = dic.keys()[0]772 dic_[root] = root773 getBranchFnc_(root)774 return dic_775776 @classmethod777 def _nodeString2namespace(cls, nodepathString, nodesep, namespacesep):778 if namespacesep in nodepathString:779 return namespacesep.join(nodepathString.split(nodesep)[-1].split(namespacesep)[:-1])780 return ''781782 @classmethod783 def _nodepathString2nodenameString(cls, nodepathString, nodesep, namespacesep):784 return nodepathString.split(nodesep)[-1].split(namespacesep)[-1]785786 @classmethod787 def _nodeString2nodenameWithNamespace(cls, nodepathString, nodesep):788 return nodepathString.split(nodesep)[-1]789790 @classmethod791 def _portString2portname(cls, portpathString, portsep):792 return portpathString.split(portsep)[-1]793794 @classmethod795 def _attrpathString2portpathString(cls, portpathString, portsep):796 return portsep.join(portpathString.split(portsep)[1:])797798 @classmethod799 def _portString2nodeString(cls, portpathString, portsep):800 return portpathString.split(portsep)[0]801802803class Mtd_BscDagpath(Mtd_BscUtility):804 @classmethod805 def _toDagpathRemapList_(cls, pathStr, pathsep):806 def addFnc_(item):807 if item:808 if not item in lis:809 lis.append(item)810811 def getBranchFnc_(pathString_):812 if not pathString_ in lis:813 stringLis = pathString_.split(pathsep)814 #815 dataCount = len(stringLis)816 for seq, data in enumerate(stringLis):817 if data:818 if seq < dataCount:819 subPath = pathsep.join(stringLis[:seq])820 addFnc_(subPath)821 #822 addFnc_(pathString_)823824 lis = []825 _ = cls._string2list(pathStr)826 for i in _:827 getBranchFnc_(i)828 return lis829830 @classmethod831 def _getDagpathDict_(cls, pathStr, pathsep):832 def addFnc_(item):833 if not item in lis:834 lis.append(item)835836 def getBranchFnc_(pathString_, pathDatumList):837 parentPathString_ = pathsep.join(pathDatumList[:-1])838 addFnc_(839 (parentPathString_, pathString_)840 )841842 def getRootFnc_(pathString_):843 addFnc_(844 (None, pathString_)845 )846847 def getMainFnc_():848 # Get Dict849 pathStringLis = cls._string2list(pathStr)850 if pathStringLis:851 for i in pathStringLis:852 pathDatumList = i.split(pathsep)853 isRoot = len(pathDatumList) == 1854 # Filter is Root855 if isRoot:856 getRootFnc_(i)857 else:858 getBranchFnc_(i, pathDatumList)859 # Reduce Dict860 if lis:861 list2dictFnc_(dic, lis)862863 def list2dictFnc_(dic_, lis_):864 for p, c in lis_:865 if p is None:866 dic[c] = c867 else:868 if p in dic_:869 if isinstance(dic_[p], (str, unicode)):870 dic_[p] = []871 dic_.setdefault(p, []).append(c)872873 #874 lis = []875 dic = cls.CLS_dic_order()876 #877 getMainFnc_()878 return dic879880881class Mtd_BscFile(Mtd_BscUtility):882 @classmethod883 def isExist(cls, fileString):884 return cls._isOsFileExist(fileString)885886 @classmethod887 def createDirectory(cls, fileString):888 cls._bsc_mtd__os_path__set_file_directory_create_(fileString)889 890 @classmethod891 def name(cls, fileString):892 return cls._getOsFileName(fileString)893 894 @classmethod895 def dirname(cls, fileString):896 return cls._getOsFileDirname(fileString)897898 @classmethod899 def basename(cls, fileString):900 return cls._getOsFileBasename(fileString)901 902 @classmethod903 def base(cls, fileString):904 return cls._getOsFileBase(fileString)905 906 @classmethod907 def ext(cls, fileString):908 return cls.MTD_os_path.splitext(fileString)[1]909910 @classmethod911 def isSame(cls, fileString, targetFileString):912 return cls._isOsSameFile(fileString, targetFileString)913914 @classmethod915 def copyTo(cls, fileString, targetFileString, force=True):916 if cls.isExist(fileString):917 cls._setOsFileCopy(fileString, targetFileString, force)918919 @classmethod920 def backupTo(cls, fileString, backupFileString, timetag=None):921 if cls.isExist(fileString):922 cls._setOsFileBackup(fileString, backupFileString, timetag)923924 @classmethod925 def renameDirnameTo(cls, fileString, newDirnameString):926 basenameString = cls.basename(fileString)927 targetTexture = cls._toOsFilename(newDirnameString, basenameString)928 return targetTexture929930 @classmethod931 def renameBasenameTo(cls, fileString, newBasenameString):932 cls._setOsFileRename(fileString, newBasenameString)933934 @classmethod935 def renameTo(cls, fileString, newFileString):936 cls._setOsFileRename_(fileString, newFileString)937 938 @classmethod939 def renameExtTo(cls, fileString, extString):940 return cls.base(fileString) + extString941942 @classmethod943 def remove(cls, fileString):944 cls._setOsPathRemove(fileString)945946 @classmethod947 def open(cls, fileString):948 cls._setOsFileOpen(fileString)949950 @classmethod951 def moveTo(cls, fileString, targetFileString):952 cls._setOsFileMove(fileString, targetFileString)953954 @classmethod955 def openDirectory(cls, fileString):956 if cls._isOsFileExist(fileString):957 directoryString = cls._getOsFileDirname(fileString)958 cls._setOsDirectoryOpen(directoryString)959960 @classmethod961 def openAsTemporary(cls, fileString, temporaryFileString):962 if cls._isOsFileExist(fileString):963 timestamp = str(cls._getOsFileMtimestamp(fileString))964 if cls._isOsFileExist(temporaryFileString):965 tempTimestamp = str(cls._getOsFileMtimestamp(temporaryFileString))966 else:967 tempTimestamp = None968969 if not timestamp == tempTimestamp:970 cls._setOsFileCopy(fileString, temporaryFileString)971 #972 cls._setOsFileOpen(temporaryFileString)973974 @classmethod975 def openAsBackup(cls, fileString):976 pass977978 @classmethod979 def isFileTimeChanged(cls, fileString, targetFileString):980 return cls._isOsFileTimeChanged(fileString, targetFileString)981982 @classmethod983 def mtimestamp(cls, fileString):984 return cls._getOsFileMtimestamp(fileString)985986 @classmethod987 def mtimetag(cls, fileString):988 return cls._getOsFileMtimetag(fileString)989990 @classmethod991 def mtimeChnPrettify(cls, fileString, useMode=0):992 return cls._timestampToChnPrettify(cls._getOsFileMtimestamp(fileString), useMode)993994 @classmethod995 def temporaryName(cls, fileString, timetag=None):996 return cls._getOsFileTemporaryName(fileString, timetag)997998 @classmethod999 def temporaryVedioName(cls, fileString):1000 tempDirectory = u'{}/vedio'.format(cls.DEF_path_temporary_local)1001 basenameString = cls._getOsFileBasename(fileString)1002 return cls._toOsFilename(tempDirectory, basenameString)10031004 @classmethod1005 def backupName(cls, fileString, timetag=None, useMode=0):1006 return cls._toOsFileJoinTimetag(fileString, timetag, useMode)10071008 @classmethod1009 def uniqueName(cls, fileString):1010 directoryString = cls._getOsFileDirname(fileString)1011 uniqueId = cls._stringToUniqueId(cls._getOsFileBasename(fileString))1012 return cls._toOsFilename(directoryString, uniqueId)10131014 @classmethod1015 def infoJsonName(cls, fileString):1016 return cls._toOsFileInfoJsonFileString(fileString)10171018 @classmethod1019 def resultName(cls, fileString):1020 return cls._toOsFileResultFileString(fileString)10211022 @classmethod1023 def backupNameDict(cls, fileString):1024 return cls._getOsFileBackupNameDict(fileString)10251026 @classmethod1027 def toJoinTimetag(cls, fileString, timetag=None, useMode=0):1028 return cls._toOsFileJoinTimetag(fileString, timetag, useMode)10291030 @classmethod1031 def findTimetag(cls, fileString):1032 return cls._getOsFileTimetag(fileString)10331034 @classmethod1035 def infoDict(cls, fileString):1036 return cls._infoDict(fileString)10371038 @classmethod1039 def productInfoDict(cls, fileString, stage=None, description=None, note=None):1040 dic = cls._infoDict(fileString)1041 dic[cls.DEF_key_stage] = stage1042 dic[cls.DEF_key_description] = description1043 dic[cls.DEF_key_note] = note1044 return dic10451046 @classmethod1047 def size(cls, fileString):1048 return cls._getOsFileSize(fileString)10491050 @classmethod1051 def seqLabel(cls, seq):1052 return ['', '_' + str(seq).zfill(4)][seq > 0]10531054 @classmethod1055 def subFilename(cls, fileString, labelString):1056 return labelString.join(cls.MTD_os_path.splitext(fileString))10571058 @classmethod1059 def reduceFilename(cls, fileString):1060 pathsep = cls.DEF_bsc__pathsep1061 return cls.MOD_re.sub('{0}|{1}'.format(pathsep * 2, pathsep * 3), pathsep, fileString)1062 1063 @classmethod1064 def toExtSplit(cls, fileString):1065 return cls.MTD_os_path.splitext(fileString)10661067 @classmethod1068 def raw2hash(cls, fileString):1069 cls._getOsFileHash(fileString)10701071 @classmethod1072 def collectionDatum(cls, fileString, targetDirectoryString, ignoreMtimeChanged=False, ignoreExists=False):1073 def getBranchFnc_(sourceFileString):1074 targetFileString = cls.renameDirnameTo(sourceFileString, targetDirectoryString)1075 #1076 enable = False1077 if cls.isExist(targetFileString):1078 if ignoreExists is True:1079 enable = False1080 else:1081 if ignoreMtimeChanged is True:1082 enable = True1083 else:1084 isMtimeChanged = cls._isOsFileTimeChanged(sourceFileString, targetFileString)1085 if isMtimeChanged:1086 enable = True1087 else:1088 enable = True1089 #1090 if enable is True:1091 lis.append((sourceFileString, targetFileString))1092 #1093 lis = []1094 #1095 osFileLis = cls._string2list(fileString)1096 if osFileLis:1097 [getBranchFnc_(i) for i in osFileLis]1098 return lis1099 1100 @classmethod1101 def composeBy(cls, directoryString, basenameString):1102 return cls._toOsFilename(directoryString, basenameString)11031104 @classmethod1105 def _getOsFileInfoDic(cls, osSourceFile, description=None, note=None):1106 return orderedDict(1107 [1108 (cls.DEF_key_info_timestamp, cls._getSystemActiveTimestamp()),1109 (cls.DEF_key_info_username, cls._getSystemUsername()),1110 #1111 (cls.DEF_key_info_host, cls._getSystemHost()),1112 (cls.DEF_key_info_hostname, cls._getSystemHostname()),1113 #1114 (cls.DEF_key_info_sourcefile, osSourceFile),1115 #1116 (cls.DEF_key_info_description, description),1117 (cls.DEF_key_info_note, note)1118 ]1119 )11201121 @classmethod1122 def _getOsFileBackupDatum(cls, fileString):1123 hashKey = cls._getOsFileHash(fileString)1124 dirname, filename, ext = cls._getOsFileDirname(fileString), cls._getOsFileName(fileString), cls._getOsFileExt(fileString)1125 #1126 targetFileString = cls.DEF_bsc__pathsep.join([cls._getOsFileDirname(fileString), cls.LynxiOsFolder_History, filename + ext, hashKey])1127 osVersionFile = cls.DEF_bsc__pathsep.join([cls._getOsFileDirname(fileString), cls.LynxiOsFolder_History, filename + cls.LynxiOsExtVAR_kit__window__version])1128 return targetFileString, osVersionFile11291130 @classmethod1131 def _setOsFileBackupTo(cls, sourceFileString, targetFileString):1132 cls._setOsFileCopy(sourceFileString, targetFileString)1133 #1134 info = cls._getOsFileInfoDic(sourceFileString)1135 infoFile = cls._toOsFileInfoJsonFileString(targetFileString)1136 cls._setOsJsonWrite(infoFile, info)1137 1138 @classmethod1139 def backup(cls, fileString):1140 if cls._isOsFileExist(fileString):1141 targetFileString, osVersionFile = cls._getOsFileBackupDatum(fileString)1142 if not cls._isOsFileExist(targetFileString):1143 cls._setOsFileBackupTo(fileString, targetFileString)1144 #1145 cls._setOsJsonWrite(1146 osVersionFile,1147 {1148 cls._getSystemActiveTimestamp(): cls._getOsFileBasename(targetFileString)1149 }1150 )115111521153class Mtd_BscSystem(Mtd_BscBasic):1154 VAR_bsc__system__name = None1155 VAR_bsc__system__name_dict = {1156 u'Windows': u'windows',1157 u'Linux': u'linux',1158 u'maya.exe': u'maya',1159 u'maya': u'maya',1160 u'houdini.exe': u'maya',1161 u'houdini': u'maya'1162 }1163 VAR_bsc__system__version_dict = {1164 u'32bit': u'x86',1165 u'64bit': u'x64'1166 }11671168 @classmethod1169 def _bsc__system_cls__get_is_active_(cls, appNameStr):1170 pass11711172 @classmethod1173 def isActive(cls):1174 return cls._bsc__system_cls__get_is_active_(1175 cls.VAR_bsc__system__name1176 )11771178 @classmethod1179 def name(cls):1180 return cls.VAR_bsc__system__name11811182 @classmethod1183 def _bsc__system_cls__get_full_version_str_(cls):1184 pass11851186 @classmethod1187 def fullVersion(cls):1188 return cls._bsc__system_cls__get_full_version_str_()11891190 @classmethod1191 def _bsc__system_cls__get_version_str_(cls):1192 pass11931194 @classmethod1195 def version(cls):1196 return cls._bsc__system_cls__get_version_str_()119711981199class Mtd_BscPlatform(Mtd_BscSystem):1200 @classmethod1201 def _bsc__system_cls__get_is_active_(cls, appNameStr):1202 key = cls.MOD_platform.system()1203 return cls.VAR_bsc__system__name_dict[key] == cls.VAR_bsc__system__name12041205 @classmethod1206 def _bsc__system_cls__get_full_version_str_(cls):1207 _ = u'.'.join(list(cls.MOD_platform.architecture()))1208 if _ in cls.VAR_bsc__system__version_dict:1209 return cls.VAR_bsc__system__version_dict[_]1210 return _12111212 @classmethod1213 def _bsc__system_cls__get_version_str_(cls):1214 _ = str(cls.MOD_platform.architecture()[0])1215 if _ in cls.VAR_bsc__system__version_dict:1216 return cls.VAR_bsc__system__version_dict[_]1217 return _121812191220class Mtd_BscApplication(Mtd_BscSystem):1221 VAR_bsc__system__name = None12221223 @classmethod1224 def _bsc__system_cls__get_is_active_(cls, appNameStr):1225 data = cls.MTD_os_path.basename(cls.MOD_sys.argv[0])1226 if data.lower() == u'{}.exe'.format(appNameStr):1227 return True1228 elif data.lower() == cls.VAR_bsc__system__name:1229 return True1230 return False123112321233class _EnvironString(str):1234 def __init__(self, value):1235 self._value = value12361237 self._key = u''1238 self._parent = None12391240 def _add(self, value):1241 if self._value:1242 lis = [i.lstrip().rstrip() for i in self._value.split(Mtd_BscUtility.MOD_os.pathsep)]1243 lowerLis = [i.lstrip().rstrip().lower() for i in self._value.lower().split(Mtd_BscUtility.MOD_os.pathsep)]1244 if value.lower() not in lowerLis:1245 lis.append(value)1246 self._value = Mtd_BscUtility.MOD_os.pathsep.join(lis)1247 else:1248 self._value = value12491250 def _sub(self, value):1251 if self._value:1252 lis = [i.lstrip().rstrip() for i in self._value.split(Mtd_BscUtility.MOD_os.pathsep)]1253 lowerLis = [i.lstrip().rstrip().lower() for i in self._value.lower().split(Mtd_BscUtility.MOD_os.pathsep)]1254 if value.lower() in lowerLis:1255 i = lowerLis.index(value.lower())1256 lis.remove(lis[i])1257 self._value = Mtd_BscUtility.MOD_os.pathsep.join(lis)12581259 def _update(self):1260 Mtd_BscUtility.MOD_os.environ[self._key] = self._value12611262 str_ = _EnvironString(self._value)1263 str_.key = self._key1264 str_.parent = self._parent12651266 self.parent.__dict__[self._key] = str_1267 return str_12681269 @property1270 def parent(self):1271 return self._parent12721273 @parent.setter1274 def parent(self, parent):1275 self._parent = parent12761277 @property1278 def key(self):1279 return self._key12801281 @key.setter1282 def key(self, key):1283 self._key = key12841285 Mtd_BscUtility.MOD_os.environ[self._key] = self._value12861287 def __iadd__(self, value):1288 if isinstance(value, list) or isinstance(value, tuple):1289 [self._add(i) for i in list(value)]1290 else:1291 self._add(value)12921293 return self._update()12941295 def __isub__(self, value):1296 if isinstance(value, list) or isinstance(value, tuple):1297 [self._sub(i) for i in list(value)]1298 else:1299 self._sub(value)13001301 return self._update()13021303 def append(self, value):1304 self._add(value)13051306 def remove(self, value):1307 self._sub(value)13081309 def __str__(self):1310 # copy list1311 lis = [i.replace('\\', '/') for i in self._value.split(Mtd_BscUtility.MOD_os.pathsep)]1312 lis.sort()1313 return '\r\n'.join(lis)131413151316class Environ(Mtd_BscUtility):1317 def __getattr__(self, key):1318 self._get(key)13191320 def __setattr__(self, key, value):1321 key = key.upper()13221323 str_ = _EnvironString(value)1324 str_.key = key1325 str_.parent = self13261327 self.__dict__[key] = str_13281329 def _get(self, key):1330 key = key.upper()13311332 value = self.MOD_os.environ.get(key, '')1333 if not key in self.__dict__:1334 str_ = _EnvironString(value)1335 str_.key = key1336 str_.parent = self13371338 self.__dict__[key] = str_1339 return str_13401341 @classmethod1342 def isExist(cls, key, value):1343 value_ = cls.MOD_os.environ.get(key)1344 if value_ is not None:1345 lowerLis = [i.lstrip().rstrip().lower() for i in value_.split(cls.MOD_os.pathsep)]1346 return value.lower() in lowerLis1347 return False134813491350class SystemPath(Mtd_BscUtility):1351 def __init__(self):1352 pass1353 @classmethod1354 def isExist(cls, pathStr):1355 pathLowerLis = [i.replace('\\', '/').lower() for i in cls.MOD_sys.path]1356 if pathStr.lower() in pathLowerLis:1357 return True1358 return False13591360 @classmethod1361 def add(cls, pathStr):1362 if cls.isExist(pathStr) is False:1363 cls.MOD_sys.path.insert(0, pathStr)13641365 @classmethod1366 def remove(cls, pathStr):1367 if cls.isExist(pathStr) is True:1368 cls.MOD_sys.path.remove(pathStr)13691370 def __iadd__(self, other):1371 if isinstance(other, tuple) or isinstance(other, list):1372 [self.add(i) for i in other]1373 elif isinstance(other, str) or isinstance(other, unicode):1374 self.add(other)13751376 return self13771378 def __radd__(self, other):1379 if isinstance(other, tuple) or isinstance(other, list):1380 [self.remove(i) for i in other]1381 elif isinstance(other, str) or isinstance(other, unicode):1382 self.remove(other)13831384 return self13851386 def __str__(self):1387 # copy list1388 lis = [i.replace('\\', '/') for i in self.MOD_sys.path]1389 lis.sort()1390 return '\r\n'.join(lis)139113921393def orderedDict(*args):
...
validation.py
Source:validation.py
1from django.core.exceptions import ImproperlyConfigured2from django.db import models3from django.db.models.fields import FieldDoesNotExist4from django.forms.models import (BaseModelForm, BaseModelFormSet, fields_for_model,5 _get_foreign_key)6from django.contrib.admin.util import get_fields_from_path, NotRelationField7from django.contrib.admin.options import (flatten_fieldsets, BaseModelAdmin,8 HORIZONTAL, VERTICAL)9__all__ = ['validate']10def validate(cls, model):11 """12 Does basic ModelAdmin option validation. Calls custom validation13 classmethod in the end if it is provided in cls. The signature of the14 custom validation classmethod should be: def validate(cls, model).15 """16 # Before we can introspect models, they need to be fully loaded so that17 # inter-relations are set up correctly. We force that here.18 models.get_apps()19 opts = model._meta20 validate_base(cls, model)21 # list_display22 if hasattr(cls, 'list_display'):23 check_isseq(cls, 'list_display', cls.list_display)24 for idx, field in enumerate(cls.list_display):25 if not callable(field):26 if not hasattr(cls, field):27 if not hasattr(model, field):28 try:29 opts.get_field(field)30 except models.FieldDoesNotExist:31 raise ImproperlyConfigured("%s.list_display[%d], %r is not a callable or an attribute of %r or found in the model %r."32 % (cls.__name__, idx, field, cls.__name__, model._meta.object_name))33 else:34 # getattr(model, field) could be an X_RelatedObjectsDescriptor35 f = fetch_attr(cls, model, opts, "list_display[%d]" % idx, field)36 if isinstance(f, models.ManyToManyField):37 raise ImproperlyConfigured("'%s.list_display[%d]', '%s' is a ManyToManyField which is not supported."38 % (cls.__name__, idx, field))39 # list_display_links40 if hasattr(cls, 'list_display_links'):41 check_isseq(cls, 'list_display_links', cls.list_display_links)42 for idx, field in enumerate(cls.list_display_links):43 if field not in cls.list_display:44 raise ImproperlyConfigured("'%s.list_display_links[%d]' "45 "refers to '%s' which is not defined in 'list_display'."46 % (cls.__name__, idx, field))47 # list_filter48 if hasattr(cls, 'list_filter'):49 check_isseq(cls, 'list_filter', cls.list_filter)50 for idx, fpath in enumerate(cls.list_filter):51 try:52 get_fields_from_path(model, fpath)53 except (NotRelationField, FieldDoesNotExist), e:54 raise ImproperlyConfigured(55 "'%s.list_filter[%d]' refers to '%s' which does not refer to a Field." % (56 cls.__name__, idx, fpath57 )58 )59 # list_per_page = 10060 if hasattr(cls, 'list_per_page') and not isinstance(cls.list_per_page, int):61 raise ImproperlyConfigured("'%s.list_per_page' should be a integer."62 % cls.__name__)63 # list_editable64 if hasattr(cls, 'list_editable') and cls.list_editable:65 check_isseq(cls, 'list_editable', cls.list_editable)66 for idx, field_name in enumerate(cls.list_editable):67 try:68 field = opts.get_field_by_name(field_name)[0]69 except models.FieldDoesNotExist:70 raise ImproperlyConfigured("'%s.list_editable[%d]' refers to a "71 "field, '%s', not defined on %s."72 % (cls.__name__, idx, field_name, model.__name__))73 if field_name not in cls.list_display:74 raise ImproperlyConfigured("'%s.list_editable[%d]' refers to "75 "'%s' which is not defined in 'list_display'."76 % (cls.__name__, idx, field_name))77 if field_name in cls.list_display_links:78 raise ImproperlyConfigured("'%s' cannot be in both '%s.list_editable'"79 " and '%s.list_display_links'"80 % (field_name, cls.__name__, cls.__name__))81 if not cls.list_display_links and cls.list_display[0] in cls.list_editable:82 raise ImproperlyConfigured("'%s.list_editable[%d]' refers to"83 " the first field in list_display, '%s', which can't be"84 " used unless list_display_links is set."85 % (cls.__name__, idx, cls.list_display[0]))86 if not field.editable:87 raise ImproperlyConfigured("'%s.list_editable[%d]' refers to a "88 "field, '%s', which isn't editable through the admin."89 % (cls.__name__, idx, field_name))90 # search_fields = ()91 if hasattr(cls, 'search_fields'):92 check_isseq(cls, 'search_fields', cls.search_fields)93 # date_hierarchy = None94 if cls.date_hierarchy:95 f = get_field(cls, model, opts, 'date_hierarchy', cls.date_hierarchy)96 if not isinstance(f, (models.DateField, models.DateTimeField)):97 raise ImproperlyConfigured("'%s.date_hierarchy is "98 "neither an instance of DateField nor DateTimeField."99 % cls.__name__)100 # ordering = None101 if cls.ordering:102 check_isseq(cls, 'ordering', cls.ordering)103 for idx, field in enumerate(cls.ordering):104 if field == '?' and len(cls.ordering) != 1:105 raise ImproperlyConfigured("'%s.ordering' has the random "106 "ordering marker '?', but contains other fields as "107 "well. Please either remove '?' or the other fields."108 % cls.__name__)109 if field == '?':110 continue111 if field.startswith('-'):112 field = field[1:]113 # Skip ordering in the format field1__field2 (FIXME: checking114 # this format would be nice, but it's a little fiddly).115 if '__' in field:116 continue117 get_field(cls, model, opts, 'ordering[%d]' % idx, field)118 if hasattr(cls, "readonly_fields"):119 check_readonly_fields(cls, model, opts)120 # list_select_related = False121 # save_as = False122 # save_on_top = False123 for attr in ('list_select_related', 'save_as', 'save_on_top'):124 if not isinstance(getattr(cls, attr), bool):125 raise ImproperlyConfigured("'%s.%s' should be a boolean."126 % (cls.__name__, attr))127 # inlines = []128 if hasattr(cls, 'inlines'):129 check_isseq(cls, 'inlines', cls.inlines)130 for idx, inline in enumerate(cls.inlines):131 if not issubclass(inline, BaseModelAdmin):132 raise ImproperlyConfigured("'%s.inlines[%d]' does not inherit "133 "from BaseModelAdmin." % (cls.__name__, idx))134 if not inline.model:135 raise ImproperlyConfigured("'model' is a required attribute "136 "of '%s.inlines[%d]'." % (cls.__name__, idx))137 if not issubclass(inline.model, models.Model):138 raise ImproperlyConfigured("'%s.inlines[%d].model' does not "139 "inherit from models.Model." % (cls.__name__, idx))140 validate_base(inline, inline.model)141 validate_inline(inline, cls, model)142def validate_inline(cls, parent, parent_model):143 # model is already verified to exist and be a Model144 if cls.fk_name: # default value is None145 f = get_field(cls, cls.model, cls.model._meta, 'fk_name', cls.fk_name)146 if not isinstance(f, models.ForeignKey):147 raise ImproperlyConfigured("'%s.fk_name is not an instance of "148 "models.ForeignKey." % cls.__name__)149 fk = _get_foreign_key(parent_model, cls.model, fk_name=cls.fk_name, can_fail=True)150 # extra = 3151 if not isinstance(cls.extra, int):152 raise ImproperlyConfigured("'%s.extra' should be a integer."153 % cls.__name__)154 # max_num = None155 max_num = getattr(cls, 'max_num', None)156 if max_num is not None and not isinstance(max_num, int):157 raise ImproperlyConfigured("'%s.max_num' should be an integer or None (default)."158 % cls.__name__)159 # formset160 if hasattr(cls, 'formset') and not issubclass(cls.formset, BaseModelFormSet):161 raise ImproperlyConfigured("'%s.formset' does not inherit from "162 "BaseModelFormSet." % cls.__name__)163 # exclude164 if hasattr(cls, 'exclude') and cls.exclude:165 if fk and fk.name in cls.exclude:166 raise ImproperlyConfigured("%s cannot exclude the field "167 "'%s' - this is the foreign key to the parent model "168 "%s." % (cls.__name__, fk.name, parent_model.__name__))169 if hasattr(cls, "readonly_fields"):170 check_readonly_fields(cls, cls.model, cls.model._meta)171def validate_base(cls, model):172 opts = model._meta173 # raw_id_fields174 if hasattr(cls, 'raw_id_fields'):175 check_isseq(cls, 'raw_id_fields', cls.raw_id_fields)176 for idx, field in enumerate(cls.raw_id_fields):177 f = get_field(cls, model, opts, 'raw_id_fields', field)178 if not isinstance(f, (models.ForeignKey, models.ManyToManyField)):179 raise ImproperlyConfigured("'%s.raw_id_fields[%d]', '%s' must "180 "be either a ForeignKey or ManyToManyField."181 % (cls.__name__, idx, field))182 # fields183 if cls.fields: # default value is None184 check_isseq(cls, 'fields', cls.fields)185 for field in cls.fields:186 if field in cls.readonly_fields:187 # Stuff can be put in fields that isn't actually a model field188 # if it's in readonly_fields, readonly_fields will handle the189 # validation of such things.190 continue191 check_formfield(cls, model, opts, 'fields', field)192 try:193 f = opts.get_field(field)194 except models.FieldDoesNotExist:195 # If we can't find a field on the model that matches,196 # it could be an extra field on the form.197 continue198 if isinstance(f, models.ManyToManyField) and not f.rel.through._meta.auto_created:199 raise ImproperlyConfigured("'%s.fields' can't include the ManyToManyField "200 "field '%s' because '%s' manually specifies "201 "a 'through' model." % (cls.__name__, field, field))202 if cls.fieldsets:203 raise ImproperlyConfigured('Both fieldsets and fields are specified in %s.' % cls.__name__)204 if len(cls.fields) > len(set(cls.fields)):205 raise ImproperlyConfigured('There are duplicate field(s) in %s.fields' % cls.__name__)206 # fieldsets207 if cls.fieldsets: # default value is None208 check_isseq(cls, 'fieldsets', cls.fieldsets)209 for idx, fieldset in enumerate(cls.fieldsets):210 check_isseq(cls, 'fieldsets[%d]' % idx, fieldset)211 if len(fieldset) != 2:212 raise ImproperlyConfigured("'%s.fieldsets[%d]' does not "213 "have exactly two elements." % (cls.__name__, idx))214 check_isdict(cls, 'fieldsets[%d][1]' % idx, fieldset[1])215 if 'fields' not in fieldset[1]:216 raise ImproperlyConfigured("'fields' key is required in "217 "%s.fieldsets[%d][1] field options dict."218 % (cls.__name__, idx))219 for fields in fieldset[1]['fields']:220 # The entry in fields might be a tuple. If it is a standalone221 # field, make it into a tuple to make processing easier.222 if type(fields) != tuple:223 fields = (fields,)224 for field in fields:225 if field in cls.readonly_fields:226 # Stuff can be put in fields that isn't actually a227 # model field if it's in readonly_fields,228 # readonly_fields will handle the validation of such229 # things.230 continue231 check_formfield(cls, model, opts, "fieldsets[%d][1]['fields']" % idx, field)232 try:233 f = opts.get_field(field)234 if isinstance(f, models.ManyToManyField) and not f.rel.through._meta.auto_created:235 raise ImproperlyConfigured("'%s.fieldsets[%d][1]['fields']' "236 "can't include the ManyToManyField field '%s' because "237 "'%s' manually specifies a 'through' model." % (238 cls.__name__, idx, field, field))239 except models.FieldDoesNotExist:240 # If we can't find a field on the model that matches,241 # it could be an extra field on the form.242 pass243 flattened_fieldsets = flatten_fieldsets(cls.fieldsets)244 if len(flattened_fieldsets) > len(set(flattened_fieldsets)):245 raise ImproperlyConfigured('There are duplicate field(s) in %s.fieldsets' % cls.__name__)246 # exclude247 if cls.exclude: # default value is None248 check_isseq(cls, 'exclude', cls.exclude)249 for field in cls.exclude:250 check_formfield(cls, model, opts, 'exclude', field)251 try:252 f = opts.get_field(field)253 except models.FieldDoesNotExist:254 # If we can't find a field on the model that matches,255 # it could be an extra field on the form.256 continue257 if len(cls.exclude) > len(set(cls.exclude)):258 raise ImproperlyConfigured('There are duplicate field(s) in %s.exclude' % cls.__name__)259 # form260 if hasattr(cls, 'form') and not issubclass(cls.form, BaseModelForm):261 raise ImproperlyConfigured("%s.form does not inherit from "262 "BaseModelForm." % cls.__name__)263 # filter_vertical264 if hasattr(cls, 'filter_vertical'):265 check_isseq(cls, 'filter_vertical', cls.filter_vertical)266 for idx, field in enumerate(cls.filter_vertical):267 f = get_field(cls, model, opts, 'filter_vertical', field)268 if not isinstance(f, models.ManyToManyField):269 raise ImproperlyConfigured("'%s.filter_vertical[%d]' must be "270 "a ManyToManyField." % (cls.__name__, idx))271 # filter_horizontal272 if hasattr(cls, 'filter_horizontal'):273 check_isseq(cls, 'filter_horizontal', cls.filter_horizontal)274 for idx, field in enumerate(cls.filter_horizontal):275 f = get_field(cls, model, opts, 'filter_horizontal', field)276 if not isinstance(f, models.ManyToManyField):277 raise ImproperlyConfigured("'%s.filter_horizontal[%d]' must be "278 "a ManyToManyField." % (cls.__name__, idx))279 # radio_fields280 if hasattr(cls, 'radio_fields'):281 check_isdict(cls, 'radio_fields', cls.radio_fields)282 for field, val in cls.radio_fields.items():283 f = get_field(cls, model, opts, 'radio_fields', field)284 if not (isinstance(f, models.ForeignKey) or f.choices):285 raise ImproperlyConfigured("'%s.radio_fields['%s']' "286 "is neither an instance of ForeignKey nor does "287 "have choices set." % (cls.__name__, field))288 if not val in (HORIZONTAL, VERTICAL):289 raise ImproperlyConfigured("'%s.radio_fields['%s']' "290 "is neither admin.HORIZONTAL nor admin.VERTICAL."291 % (cls.__name__, field))292 # prepopulated_fields293 if hasattr(cls, 'prepopulated_fields'):294 check_isdict(cls, 'prepopulated_fields', cls.prepopulated_fields)295 for field, val in cls.prepopulated_fields.items():296 f = get_field(cls, model, opts, 'prepopulated_fields', field)297 if isinstance(f, (models.DateTimeField, models.ForeignKey,298 models.ManyToManyField)):299 raise ImproperlyConfigured("'%s.prepopulated_fields['%s']' "300 "is either a DateTimeField, ForeignKey or "301 "ManyToManyField. This isn't allowed."302 % (cls.__name__, field))303 check_isseq(cls, "prepopulated_fields['%s']" % field, val)304 for idx, f in enumerate(val):305 get_field(cls, model, opts, "prepopulated_fields['%s'][%d]" % (field, idx), f)306def check_isseq(cls, label, obj):307 if not isinstance(obj, (list, tuple)):308 raise ImproperlyConfigured("'%s.%s' must be a list or tuple." % (cls.__name__, label))309def check_isdict(cls, label, obj):310 if not isinstance(obj, dict):311 raise ImproperlyConfigured("'%s.%s' must be a dictionary." % (cls.__name__, label))312def get_field(cls, model, opts, label, field):313 try:314 return opts.get_field(field)315 except models.FieldDoesNotExist:316 raise ImproperlyConfigured("'%s.%s' refers to field '%s' that is missing from model '%s'."317 % (cls.__name__, label, field, model.__name__))318def check_formfield(cls, model, opts, label, field):319 if getattr(cls.form, 'base_fields', None):320 try:321 cls.form.base_fields[field]322 except KeyError:323 raise ImproperlyConfigured("'%s.%s' refers to field '%s' that "324 "is missing from the form." % (cls.__name__, label, field))325 else:326 fields = fields_for_model(model)327 try:328 fields[field]329 except KeyError:330 raise ImproperlyConfigured("'%s.%s' refers to field '%s' that "331 "is missing from the form." % (cls.__name__, label, field))332def fetch_attr(cls, model, opts, label, field):333 try:334 return opts.get_field(field)335 except models.FieldDoesNotExist:336 pass337 try:338 return getattr(model, field)339 except AttributeError:340 raise ImproperlyConfigured("'%s.%s' refers to '%s' that is neither a field, method or property of model '%s'."341 % (cls.__name__, label, field, model.__name__))342def check_readonly_fields(cls, model, opts):343 check_isseq(cls, "readonly_fields", cls.readonly_fields)344 for idx, field in enumerate(cls.readonly_fields):345 if not callable(field):346 if not hasattr(cls, field):347 if not hasattr(model, field):348 try:349 opts.get_field(field)350 except models.FieldDoesNotExist:351 raise ImproperlyConfigured("%s.readonly_fields[%d], %r is not a callable or an attribute of %r or found in the model %r."...
base_thing.py
Source:base_thing.py
1import logging2logger = logging.getLogger(__name__)3class BaseThing(object):4 """ A base class for AWS-IOT things5 This class holds state info for a device; The variables in state info control the device's behavior.6 The expected steps to use this class are:7 * instantiate the class8 * update the shadow state: this invokes any actions the device should perform on a state change9 * read the reported state: this adds coditions (temperature, battery voltage, etc) to the reported_state10 The inherited class should:11 * Implement a property to return an ID. The ID will be used to get the AWS-IOT shadow12 * Provide a function to return a timestamp. Getting the current date/time can be device specific13 * Register additional operations/functions by adding them to the _operations and _test_operations dictionaries14 """15 def __init__(cls):16 """17 This method can be overridden by a child class to add more device specific operations and hardware initialization18 super().__init__ should be called early in the child class to set up the data structures19 """20 # the following dictionaries are functions that are called when a desired state variable changes.21 # functions can be added by the child class __init__ function for device specific operations22 cls._operations = {'test': cls._dispatch_test, 'test_param': cls._dispatch_test}23 cls._test_operations = {'none' : cls._test_none}24 # the following dictionary are identify functions to measure conditions25 # functions can be added by the child class __init__ function for device specific operations26 # The following example reports free memory if it changes by 2K or every hour, which ever comes first27 # self._conditions['freeMemory'] = {'get': self.get_mem_free, 'threshold' : 2048, 'interval': 3600}28 cls._conditions = {}29 cls._shadow_state = {} # holds the shadow state obtained from AWS-IOT; read (not modified) by the controller30 cls._reported_state = {} # holds the state to be posted to the shadow; written by the controller31 """ _current_state should reflect the device's REAL state.32 * _current_state parameters start with defaults, which get over-written by the values read from persistent store33 * not all shadow_state parameters need to be persisted34 * sleep is persisted because it is used even if unable to get the desired values from AWS35 * the sleep default of 0 allows recovery if the network is not reachable when the device is initialized36 * test and test_param are persisted so they can be compared to the new desired state37 """38 cls._current_state = {'params': {'sleep': 0, 'test': 'none', 'test_param': 0}, 'history' : []}39 cls._restored_state = cls._restore_state()40 if len(cls._restored_state) > 0:41 for key in cls._restored_state['params']:42 cls._current_state['params'][key] = cls._restored_state['params'][key]43 cls._has_history = 'history' in cls._restored_state and len(cls._restored_state['history']) > 044 cls._timestamp = None45 """46 @property and prop.setter decorators were not used because I couldn't get the call to super().prop to work47 in the derived class. Hence for reported & shadow state, the prop = property(get, set) mechanism was used.48 """49 def _reported_state_get(cls):50 """51 Adds conditions to the reported state.52 Note: Desired value changes are added to _reported_state in shadow_state.setter53 This method can be overridden by a child class to report additional device specific state54 super()._reported_state_get should be called by the child class to return the base_class reported state55 Since getting conditions may read hardware, e.g. I2C, then call as few times as possible to save power56 """57 for condition in cls._conditions:58 if 'get' in cls._conditions[condition]:59 logger.debug("Checking condition: %s", condition)60 # if condition already in reported state, then check threshold crossing & update interval61 if 'state' in cls._shadow_state and 'reported' in cls._shadow_state['state']\62 and condition in cls._shadow_state['state']['reported']:63 if 'threshold' in cls._conditions[condition]:64 current_value = cls._conditions[condition]['get']()65 delta = cls._shadow_state['state']['reported'][condition] - current_value66 if (abs(delta) > cls._conditions[condition]['threshold']):67 cls._reported_state[condition] = current_value68 logger.debug("Updating %s due to threshold %d; delta: %d", condition, \69 cls._conditions[condition]['threshold'], delta)70 if not condition in cls._reported_state and \71 'interval' in cls._conditions[condition] and \72 'metadata' in cls._shadow_state:73 if cls._timestamp is not None:74 previous_report_timestamp = cls._shadow_state['metadata']['reported'][condition]['timestamp']75 delta_time = cls._timestamp - previous_report_timestamp76 if delta_time > cls._conditions[condition]['interval']:77 logger.debug("Updating %s due to interval %s; delta_time: %s", condition, \78 cls._conditions[condition]['interval'], delta_time)79 cls._reported_state[condition] = cls._conditions[condition]['get']()80 else:81 logger.warning("no cls._timestamp when evaluating condition interval")82 else:83 cls._reported_state[condition] = cls._conditions[condition]['get']()84 else:85 logger.warning("no get function for condition: %s", condition)86 return cls._reported_state87 reported_state = property(_reported_state_get)88 def _shadow_state_get(cls):89 """90 This method is provided for completeness, and is not used in normal operation91 """92 return cls._shadow_state93 def _shadow_state_set(cls, shadow_state):94 """ Compares shadow state received from AWS-IOT to the current state. If a variable (current vs desired)95 doesn't match, a function from the _operations dictionary will be called. After the operation is complete,96 the updated current state and command history is persisted. The reported state is also updated.97 This method can be overridden by a child class to perform additional state checks before calling98 super()._shadow_state_set99 """100 cls._shadow_state = shadow_state101 history_updated = False102 # the sleep parameter does not trigger an operation; its updated so it will be persisted103 if 'sleep' in shadow_state['state']['desired']:104 cls._current_state['params']['sleep'] = shadow_state['state']['desired']['sleep']105 # parameters that trigger an operation, such as test, are updated in the function that processes them.106 if cls._has_history and cls._restored_state['history'][0]['done'] != 1:107 # The operation didn't finished, so update the status to be reflected in an updated report108 cls._current_state['history'][0]['done'] = 1109 cls._current_state['history'][0]['status'] = "Error: state change: {}: {} failed before completion"\110 .format(cls._restored_state['history'][0]['op'], cls._restored_state['history'][0]['value'])111 history_updated = True112 else:113 for key in cls._operations:114 desired_unequal_current = False115 if key in shadow_state['state']['desired']:116 desired_unequal_current = shadow_state['state']['desired'][key] != cls._current_state['params'][key]117 # if the persisted state is unknown, the operation is not performed118 if cls._current_state['params'][key] == "unknown":119 desired_unequal_current = False120 # check the metadata timestamp to prevent an operation being performed twice121 timestamp_changed = True122 desired_timestamp = 0123 if cls._has_history and key == cls._restored_state['history'][0]['op']:124 if 'metadata' in shadow_state and key in shadow_state['metadata']['desired']:125 desired_timestamp = shadow_state['metadata']['desired'][key]['timestamp']126 if desired_timestamp == cls._restored_state['history'][0]['timestamp']:127 timestamp_changed = False128 if desired_unequal_current and timestamp_changed:129 cls._current_state['history'].insert(0, {'done': 0, \130 'op': key, \131 'value': shadow_state['state']['desired'][key], \132 'timestamp' : desired_timestamp})133 logger.debug("Performing operation: %s", key)134 cls._current_state['history'][0]['status'] = cls._operations[key]()135 logger.info("Operation '%s' status: %s", key, cls._current_state['history'][0]['status'])136 cls._current_state['history'][0]['done'] = 1137 history_updated = True138 # dispatch only one parameter per update139 break140 # persist parameter changes, if any141 state_change = False142 if len(cls._restored_state) == 0 or history_updated:143 state_change = True144 else:145 for key in cls._current_state['params']:146 if key not in cls._restored_state['params'] or cls._current_state['params'][key] != cls._restored_state['params'][key]:147 state_change = True148 break149 if state_change:150 # maintain a history of 2 operations by appending the restored (previous) operation151 if cls._has_history:152 cls._current_state['history'].append(cls._restored_state['history'][0])153 cls._persist_state()154 # generate reported state155 if history_updated:156 cls._reported_state['status'] = cls._current_state['history'][0]['status']157 for key, value in shadow_state['state']['desired'].items():158 # Note: On start-up, there is no reported dictionary159 # This routine only reports changed values in order to not have the shadow version keep incrementing160 if key in cls._current_state['params']:161 # persisted keys report current_state162 if 'reported' not in shadow_state['state'] or \163 key not in shadow_state['state']['reported'] or \164 shadow_state['state']['reported'][key] != cls._current_state['params'][key]:165 cls._reported_state[key] = cls._current_state['params'][key]166 else:167 # non-persisted keys report/echo shadow_state168 if 'reported' not in shadow_state['state'] or \169 key not in shadow_state['state']['reported'] or \170 shadow_state['state']['reported'][key] != value:171 cls._reported_state[key] = value172 shadow_state = property(_shadow_state_get, _shadow_state_set)173 def connect(cls):174 """ This method can be overridden by a child class to invoke device specific networking code to connect to175 the network.176 """177 return True, None178 def sleep(cls, msg=None):179 """ This method can be overridden by a child class to invoke device specific power saving modes.180 """181 from utime import sleep182 if msg is not None:183 logger.warning("%s", msg)184 logger.info(" ... Going to sleep for %s seconds.", cls._current_state['params']['sleep'])185 sleep(cls._current_state['params']['sleep'])186 def get_aws_iot_cfg(cls):187 """ This method can be overridden by a child class to retrieve from a device specific persistent store.188 """189 return cls._get_cfg_info("aws_iot_cfg.txt")190 def get_aws_credentials(cls):191 """ This method SHOULD be overridden by a child class to retrieve from a SECURE persistent store.192 """193 return cls._get_cfg_info("aws_credentials.txt")194 def get_private_key(cls):195 """ This method SHOULD be overridden by a child class to retrieve from a SECURE persistent store.196 """197 return cls._get_cfg_info_txt("private.key")198 def get_certificate(cls):199 """ This method SHOULD be overridden by a child class to retrieve from a SECURE persistent store.200 """201 return cls._get_cfg_info_txt("certificate.pem")202 def _restore_state(cls):203 """ This method must be overridden by a child class to write to device specific persistent storage204 """205 logger.error("_restore_state should be overridden by the child.")206 return {}207 def _persist_state(cls):208 """ This method must be overridden by a child class to read from device specific persistent storage209 """210 logger.error("_persist_state should be overridden by the child.")211 def _dispatch_test(cls):212 """ calls one of the test functions based on the string in the shadow_state 'test' variable213 Inputs: shadow_state214 Returns: status string received from the test215 """216 cls._current_state['params']['test_param'] = 0217 if 'test_param' in cls._shadow_state['state']['desired']:218 cls._current_state['params']['test_param'] = cls._shadow_state['state']['desired']['test_param']219 cls._current_state['params']['test'] = cls._shadow_state['state']['desired']['test']220 if cls._current_state['params']['test'] in cls._test_operations:221 status = cls._test_operations[cls._current_state['params']['test']]()222 else:223 status = "Unrecognized test: " + cls._current_state['params']['test']224 return status225 def _test_none(cls):226 """ a dummy test, but it can be used to verify tests are being dispatched227 """228 return "pass: test 'none'"229 def _get_cfg_info(cls, filename):230 import ujson231 try:232 with open(filename) as f:233 cfg_info = ujson.load(f)234 return cfg_info235 except OSError as e:236 e_str = str(e)237 logger.error("In get_cfg_info from filename: %s Exception: %s", filename, e_str)238 return None239 def _get_cfg_info_txt(cls, filename):240 try:241 with open(filename) as f:242 cfg_info = f.read()243 return cfg_info244 except OSError as e:245 e_str = str(e)246 logger.error("In get_cfg_info from filename: %s Exception: %s", filename, e_str)...
fixtures.py
Source:fixtures.py
1# testing/fixtures.py2# Copyright (C) 2005-2018 the SQLAlchemy authors and contributors3# <see AUTHORS file>4#5# This module is part of SQLAlchemy and is released under6# the MIT License: http://www.opensource.org/licenses/mit-license.php7from . import config8from . import assertions, schema9from .util import adict10from .. import util11from .engines import drop_all_tables12from .entities import BasicEntity, ComparableEntity13import sys14import sqlalchemy as sa15from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta16# whether or not we use unittest changes things dramatically,17# as far as how py.test collection works.18class TestBase(object):19 # A sequence of database names to always run, regardless of the20 # constraints below.21 __whitelist__ = ()22 # A sequence of requirement names matching testing.requires decorators23 __requires__ = ()24 # A sequence of dialect names to exclude from the test class.25 __unsupported_on__ = ()26 # If present, test class is only runnable for the *single* specified27 # dialect. If you need multiple, use __unsupported_on__ and invert.28 __only_on__ = None29 # A sequence of no-arg callables. If any are True, the entire testcase is30 # skipped.31 __skip_if__ = None32 def assert_(self, val, msg=None):33 assert val, msg34 # apparently a handful of tests are doing this....OK35 def setup(self):36 if hasattr(self, "setUp"):37 self.setUp()38 def teardown(self):39 if hasattr(self, "tearDown"):40 self.tearDown()41class TablesTest(TestBase):42 # 'once', None43 run_setup_bind = 'once'44 # 'once', 'each', None45 run_define_tables = 'once'46 # 'once', 'each', None47 run_create_tables = 'once'48 # 'once', 'each', None49 run_inserts = 'each'50 # 'each', None51 run_deletes = 'each'52 # 'once', None53 run_dispose_bind = None54 bind = None55 metadata = None56 tables = None57 other = None58 @classmethod59 def setup_class(cls):60 cls._init_class()61 cls._setup_once_tables()62 cls._setup_once_inserts()63 @classmethod64 def _init_class(cls):65 if cls.run_define_tables == 'each':66 if cls.run_create_tables == 'once':67 cls.run_create_tables = 'each'68 assert cls.run_inserts in ('each', None)69 cls.other = adict()70 cls.tables = adict()71 cls.bind = cls.setup_bind()72 cls.metadata = sa.MetaData()73 cls.metadata.bind = cls.bind74 @classmethod75 def _setup_once_inserts(cls):76 if cls.run_inserts == 'once':77 cls._load_fixtures()78 cls.insert_data()79 @classmethod80 def _setup_once_tables(cls):81 if cls.run_define_tables == 'once':82 cls.define_tables(cls.metadata)83 if cls.run_create_tables == 'once':84 cls.metadata.create_all(cls.bind)85 cls.tables.update(cls.metadata.tables)86 def _setup_each_tables(self):87 if self.run_define_tables == 'each':88 self.tables.clear()89 if self.run_create_tables == 'each':90 drop_all_tables(self.metadata, self.bind)91 self.metadata.clear()92 self.define_tables(self.metadata)93 if self.run_create_tables == 'each':94 self.metadata.create_all(self.bind)95 self.tables.update(self.metadata.tables)96 elif self.run_create_tables == 'each':97 drop_all_tables(self.metadata, self.bind)98 self.metadata.create_all(self.bind)99 def _setup_each_inserts(self):100 if self.run_inserts == 'each':101 self._load_fixtures()102 self.insert_data()103 def _teardown_each_tables(self):104 # no need to run deletes if tables are recreated on setup105 if self.run_define_tables != 'each' and self.run_deletes == 'each':106 with self.bind.connect() as conn:107 for table in reversed(self.metadata.sorted_tables):108 try:109 conn.execute(table.delete())110 except sa.exc.DBAPIError as ex:111 util.print_(112 ("Error emptying table %s: %r" % (table, ex)),113 file=sys.stderr)114 def setup(self):115 self._setup_each_tables()116 self._setup_each_inserts()117 def teardown(self):118 self._teardown_each_tables()119 @classmethod120 def _teardown_once_metadata_bind(cls):121 if cls.run_create_tables:122 drop_all_tables(cls.metadata, cls.bind)123 if cls.run_dispose_bind == 'once':124 cls.dispose_bind(cls.bind)125 cls.metadata.bind = None126 if cls.run_setup_bind is not None:127 cls.bind = None128 @classmethod129 def teardown_class(cls):130 cls._teardown_once_metadata_bind()131 @classmethod132 def setup_bind(cls):133 return config.db134 @classmethod135 def dispose_bind(cls, bind):136 if hasattr(bind, 'dispose'):137 bind.dispose()138 elif hasattr(bind, 'close'):139 bind.close()140 @classmethod141 def define_tables(cls, metadata):142 pass143 @classmethod144 def fixtures(cls):145 return {}146 @classmethod147 def insert_data(cls):148 pass149 def sql_count_(self, count, fn):150 self.assert_sql_count(self.bind, fn, count)151 def sql_eq_(self, callable_, statements):152 self.assert_sql(self.bind, callable_, statements)153 @classmethod154 def _load_fixtures(cls):155 """Insert rows as represented by the fixtures() method."""156 headers, rows = {}, {}157 for table, data in cls.fixtures().items():158 if len(data) < 2:159 continue160 if isinstance(table, util.string_types):161 table = cls.tables[table]162 headers[table] = data[0]163 rows[table] = data[1:]164 for table in cls.metadata.sorted_tables:165 if table not in headers:166 continue167 cls.bind.execute(168 table.insert(),169 [dict(zip(headers[table], column_values))170 for column_values in rows[table]])171from sqlalchemy import event172class RemovesEvents(object):173 @util.memoized_property174 def _event_fns(self):175 return set()176 def event_listen(self, target, name, fn, **kw):177 self._event_fns.add((target, name, fn))178 event.listen(target, name, fn, **kw)179 def teardown(self):180 for key in self._event_fns:181 event.remove(*key)182 super_ = super(RemovesEvents, self)183 if hasattr(super_, "teardown"):184 super_.teardown()185class _ORMTest(object):186 @classmethod187 def teardown_class(cls):188 sa.orm.session.Session.close_all()189 sa.orm.clear_mappers()190class ORMTest(_ORMTest, TestBase):191 pass192class MappedTest(_ORMTest, TablesTest, assertions.AssertsExecutionResults):193 # 'once', 'each', None194 run_setup_classes = 'once'195 # 'once', 'each', None196 run_setup_mappers = 'each'197 classes = None198 @classmethod199 def setup_class(cls):200 cls._init_class()201 if cls.classes is None:202 cls.classes = adict()203 cls._setup_once_tables()204 cls._setup_once_classes()205 cls._setup_once_mappers()206 cls._setup_once_inserts()207 @classmethod208 def teardown_class(cls):209 cls._teardown_once_class()210 cls._teardown_once_metadata_bind()211 def setup(self):212 self._setup_each_tables()213 self._setup_each_classes()214 self._setup_each_mappers()215 self._setup_each_inserts()216 def teardown(self):217 sa.orm.session.Session.close_all()218 self._teardown_each_mappers()219 self._teardown_each_classes()220 self._teardown_each_tables()221 @classmethod222 def _teardown_once_class(cls):223 cls.classes.clear()224 _ORMTest.teardown_class()225 @classmethod226 def _setup_once_classes(cls):227 if cls.run_setup_classes == 'once':228 cls._with_register_classes(cls.setup_classes)229 @classmethod230 def _setup_once_mappers(cls):231 if cls.run_setup_mappers == 'once':232 cls._with_register_classes(cls.setup_mappers)233 def _setup_each_mappers(self):234 if self.run_setup_mappers == 'each':235 self._with_register_classes(self.setup_mappers)236 def _setup_each_classes(self):237 if self.run_setup_classes == 'each':238 self._with_register_classes(self.setup_classes)239 @classmethod240 def _with_register_classes(cls, fn):241 """Run a setup method, framing the operation with a Base class242 that will catch new subclasses to be established within243 the "classes" registry.244 """245 cls_registry = cls.classes246 class FindFixture(type):247 def __init__(cls, classname, bases, dict_):248 cls_registry[classname] = cls249 return type.__init__(cls, classname, bases, dict_)250 class _Base(util.with_metaclass(FindFixture, object)):251 pass252 class Basic(BasicEntity, _Base):253 pass254 class Comparable(ComparableEntity, _Base):255 pass256 cls.Basic = Basic257 cls.Comparable = Comparable258 fn()259 def _teardown_each_mappers(self):260 # some tests create mappers in the test bodies261 # and will define setup_mappers as None -262 # clear mappers in any case263 if self.run_setup_mappers != 'once':264 sa.orm.clear_mappers()265 def _teardown_each_classes(self):266 if self.run_setup_classes != 'once':267 self.classes.clear()268 @classmethod269 def setup_classes(cls):270 pass271 @classmethod272 def setup_mappers(cls):273 pass274class DeclarativeMappedTest(MappedTest):275 run_setup_classes = 'once'276 run_setup_mappers = 'once'277 @classmethod278 def _setup_once_tables(cls):279 pass280 @classmethod281 def _with_register_classes(cls, fn):282 cls_registry = cls.classes283 class FindFixtureDeclarative(DeclarativeMeta):284 def __init__(cls, classname, bases, dict_):285 cls_registry[classname] = cls286 return DeclarativeMeta.__init__(287 cls, classname, bases, dict_)288 class DeclarativeBasic(object):289 __table_cls__ = schema.Table290 _DeclBase = declarative_base(metadata=cls.metadata,291 metaclass=FindFixtureDeclarative,292 cls=DeclarativeBasic)293 cls.DeclarativeBasic = _DeclBase294 fn()295 if cls.metadata.tables and cls.run_create_tables:...
db.py
Source:db.py
1# coding: utf-82"""3æ°æ®åºæ¥å£4"""5from pymongo import MongoClient6from utils import *7class DB:8 """9 å°½å¯è½ä¸å¯¹ doc ä½å¤çï¼ç´æ¥è¿å query çç»æãç± manager ä½è¿ä¸æ¥å¤çã10 """11 db = MongoClient('127.0.0.1', 27017).zhihu_data12 @classmethod13 def find_user(cls):14 pass15 @classmethod16 def get_questions(cls, tid):17 return cls.db[q_col(tid)].find()18 @classmethod19 def find_latest_question(cls, tid):20 # TODO: so ask: å¦ä½å»ºç´¢å¼?21 cursor = cls.db[q_col(tid)].find().sort('time', -1)22 if cursor.count() > 0:23 return cursor[0]24 else:25 return None26 @classmethod27 def save_question(cls, tid, url, qid, time, asker, title):28 cls.db[q_col(tid)].insert({29 'topic': str(tid),30 'url': url,31 'qid': str(qid),32 'time': time,33 'asker': asker,34 'title': title,35 'follower': [],36 'active': True37 })38 @classmethod39 def add_question_follower(cls, tid, qid, new_followers):40 cls.db[q_col(tid)].update({'qid': str(qid)}, {41 '$push': {42 'follower': {43 '$each': list(new_followers)44 }45 }46 })47 @classmethod48 def get_question_follower(cls, tid, qid, limit=None):49 if limit is None:50 return cls.db[q_col(tid)].\51 find_one({'qid': str(qid)}, {'follower': 1, '_id':0})['follower']52 else:53 limit *= -154 return cls.db[q_col(tid)]. \55 find_one({'qid': str(qid)},56 {'follower': {'$slice': limit}, '_id':0})['follower']57 @classmethod58 def get_question_follower_num(cls, tid, qid):59 cursor = cls.db[q_col(tid)].aggregate([60 {'$match': {'qid': str(qid)}},61 {62 '$project': {63 'follower_count': {'$size': "$follower"}64 }65 }66 ])67 return list(cursor)[0]['follower_count']68 @classmethod69 def set_question_inactive(cls, tid, qid):70 cls.db[q_col(tid)].update_one(71 {'qid': str(qid)},72 {73 '$set': {74 'active': False75 }76 }77 )78 @classmethod79 def save_answer(cls, tid, aid, url, qid, time, answerer, upvoters=None,80 commenters=None, collectors=None):81 upvoters = [] if upvoters is None else upvoters82 commenters = [] if commenters is None else commenters83 collectors = [] if collectors is None else collectors84 cls.db[a_col(tid)].insert({85 'topic': str(tid),86 'aid': str(aid),87 'url': url,88 'qid': str(qid),89 'time': time,90 'answerer': answerer,91 'upvoters': upvoters,92 'commenters': commenters,93 'collectors': collectors94 })95 @classmethod96 def get_question(cls, tid, qid):97 return cls.db[q_col(tid)].find_one({'qid': str(qid)}) # None or dict98 @classmethod99 def get_all_questions(cls, *args):100 result = []101 if args:102 fields = {arg: 1 for arg in args}103 else:104 fields = {'_id': 0} # include all fields105 for collection_name in cls.db.collection_names():106 if is_q_col(collection_name):107 result.extend(108 list(cls.db[collection_name].find({}, fields))109 )110 return result111 @classmethod112 def get_question_attrs(cls, tid, qid, *args):113 fields = {arg: 1 for arg in args}114 return cls.db[q_col(tid)].find_one({'qid': str(qid)}, fields)115 @classmethod116 def remove_question(cls, tid, qid):117 cls.db[q_col(tid)].remove({'qid': str(qid)})118 @classmethod119 def answer_exists(cls, tid, aid):120 return cls.db[a_col(tid)].find({'aid': str(aid)}, {'_id': 1}) \121 .limit(1).count() > 0122 @classmethod123 def get_one_answer(cls, tid, aid):124 return cls.db[a_col(tid)].find_one({'aid': str(aid)})125 @classmethod126 def get_answer_affected_user_with_limit(cls, tid, aid, limit=5):127 return cls._get_answer_affected_user(128 tid, aid, ['commenters', 'upvoters', 'collectors'], limit=limit)129 @classmethod130 def get_upvoters(cls, tid, aid, limit=None):131 return cls._get_answer_affected_user(tid, aid, ['upvoters'], limit)132 @classmethod133 def get_commenters(cls, tid, aid, limit=None):134 return cls._get_answer_affected_user(tid, aid, ['commenters'], limit)135 @classmethod136 def get_collectors(cls, tid, aid, limit=None):137 return cls._get_answer_affected_user(tid, aid, ['collectors'], limit)138 @classmethod139 def _get_answer_affected_user(cls, tid, aid, fields, limit=None):140 if limit is None:141 fields = {field: 1 for field in fields}142 return cls.db[a_col(tid)].find_one({'aid': str(aid)}, fields)143 else:144 limit *= -1145 fields = {field: {'$slice': limit} for field in fields}146 return cls.db[a_col(tid)].find_one({'aid': str(aid)}, fields)147 @classmethod148 def add_upvoters(cls, tid, aid, new_upvoters):149 cls.db[a_col(tid)].update({'aid': str(aid)}, {150 '$push': {151 'upvoters': {152 '$each': list(new_upvoters)153 }154 }155 })156 @classmethod157 def add_commenters(cls, tid, aid, new_commenters):158 # pymongoä¸è¯å«deque,åªè½è½¬ä¸ºlist159 cls.db[a_col(tid)].update({'aid': str(aid)}, {160 '$push': {161 'commenters': {162 '$each': list(new_commenters)163 }164 }165 })166 @classmethod167 def add_collectors(cls, tid, aid, new_collectors):168 cls.db[a_col(tid)].update({'aid': str(aid)}, {169 '$push': {170 'collectors': {171 '$each': list(new_collectors)172 }173 }174 })175 @classmethod176 def remove_answer(cls, tid, aid):177 cls.db[a_col(tid)].remove({'aid': str(aid)})178 @classmethod179 def get_question_answerer(cls, tid, qid):180 return cls.db[a_col(tid)].find(181 {'qid': str(qid)},182 {'answerer': 1, '_id': 0}183 )184 @classmethod185 def get_question_answer_attrs(cls, tid, qid, *args):186 fields = {arg: 1 for arg in args}187 return cls.db[a_col(tid)].find({'qid': str(qid)}, fields)188 @classmethod189 def drop_all_collections(cls):190 for collection_name in cls.db.collection_names():191 if 'system' not in collection_name:192 cls.db[collection_name].drop()193 @classmethod194 def drop_qa_collections(cls):195 for collection_name in cls.db.collection_names():196 if 'system' not in collection_name and collection_name != 'user':197 cls.db[collection_name].drop()198 @classmethod199 def get_answer_affecter_num(cls, tid, aid):200 cursor = cls.db[a_col(tid)].aggregate([201 {'$match': {'aid': str(aid)}},202 {203 '$project': {204 'up_count': {'$size': "$upvoters"},205 'com_count': {'$size': "$commenters"},206 'col_count': {'$size': "$collectors"},207 }208 }209 ])...
COW.py
Source:COW.py
1#2# This is a copy on write dictionary and set which abuses classes to try and be nice and fast.3#4# Copyright (C) 2006 Tim Ansell5#6#Please Note:7# Be careful when using mutable types (ie Dict and Lists) - operations involving these are SLOW.8# Assign a file to __warn__ to get warnings about slow operations.9#10import copy11ImmutableTypes = (12 bool,13 complex,14 float,15 int,16 tuple,17 frozenset,18 str19)20MUTABLE = "__mutable__"21class COWMeta(type):22 pass23class COWDictMeta(COWMeta):24 __warn__ = False25 __hasmutable__ = False26 __marker__ = tuple()27 def __str__(cls):28 # FIXME: I have magic numbers!29 return "<COWDict Level: %i Current Keys: %i>" % (cls.__count__, len(cls.__dict__) - 3)30 __repr__ = __str__31 def cow(cls):32 class C(cls):33 __count__ = cls.__count__ + 134 return C35 copy = cow36 __call__ = cow37 def __setitem__(cls, key, value):38 if value is not None and not isinstance(value, ImmutableTypes):39 if not isinstance(value, COWMeta):40 cls.__hasmutable__ = True41 key += MUTABLE42 setattr(cls, key, value)43 def __getmutable__(cls, key, readonly=False):44 nkey = key + MUTABLE45 try:46 return cls.__dict__[nkey]47 except KeyError:48 pass49 value = getattr(cls, nkey)50 if readonly:51 return value52 if not cls.__warn__ is False and not isinstance(value, COWMeta):53 print("Warning: Doing a copy because %s is a mutable type." % key, file=cls.__warn__)54 try:55 value = value.copy()56 except AttributeError as e:57 value = copy.copy(value)58 setattr(cls, nkey, value)59 return value60 __getmarker__ = []61 def __getreadonly__(cls, key, default=__getmarker__):62 """\63 Get a value (even if mutable) which you promise not to change.64 """65 return cls.__getitem__(key, default, True)66 def __getitem__(cls, key, default=__getmarker__, readonly=False):67 try:68 try:69 value = getattr(cls, key)70 except AttributeError:71 value = cls.__getmutable__(key, readonly)72 # This is for values which have been deleted73 if value is cls.__marker__:74 raise AttributeError("key %s does not exist." % key)75 return value76 except AttributeError as e:77 if not default is cls.__getmarker__:78 return default79 raise KeyError(str(e))80 def __delitem__(cls, key):81 cls.__setitem__(key, cls.__marker__)82 def __revertitem__(cls, key):83 if key not in cls.__dict__:84 key += MUTABLE85 delattr(cls, key)86 def __contains__(cls, key):87 return cls.has_key(key)88 def has_key(cls, key):89 value = cls.__getreadonly__(key, cls.__marker__)90 if value is cls.__marker__:91 return False92 return True93 def iter(cls, type, readonly=False):94 for key in dir(cls):95 if key.startswith("__"):96 continue97 if key.endswith(MUTABLE):98 key = key[:-len(MUTABLE)]99 if type == "keys":100 yield key101 try:102 if readonly:103 value = cls.__getreadonly__(key)104 else:105 value = cls[key]106 except KeyError:107 continue108 if type == "values":109 yield value110 if type == "items":111 yield (key, value)112 return113 def iterkeys(cls):114 return cls.iter("keys")115 def itervalues(cls, readonly=False):116 if not cls.__warn__ is False and cls.__hasmutable__ and readonly is False:117 print("Warning: If you arn't going to change any of the values call with True.", file=cls.__warn__)118 return cls.iter("values", readonly)119 def iteritems(cls, readonly=False):120 if not cls.__warn__ is False and cls.__hasmutable__ and readonly is False:121 print("Warning: If you arn't going to change any of the values call with True.", file=cls.__warn__)122 return cls.iter("items", readonly)123class COWSetMeta(COWDictMeta):124 def __str__(cls):125 # FIXME: I have magic numbers!126 return "<COWSet Level: %i Current Keys: %i>" % (cls.__count__, len(cls.__dict__) -3)127 __repr__ = __str__128 def cow(cls):129 class C(cls):130 __count__ = cls.__count__ + 1131 return C132 def add(cls, value):133 COWDictMeta.__setitem__(cls, repr(hash(value)), value)134 def remove(cls, value):135 COWDictMeta.__delitem__(cls, repr(hash(value)))136 def __in__(cls, value):137 return repr(hash(value)) in COWDictMeta138 def iterkeys(cls):139 raise TypeError("sets don't have keys")140 def iteritems(cls):141 raise TypeError("sets don't have 'items'")142# These are the actual classes you use!143class COWDictBase(object, metaclass = COWDictMeta):144 __count__ = 0145class COWSetBase(object, metaclass = COWSetMeta):146 __count__ = 0147if __name__ == "__main__":148 import sys149 COWDictBase.__warn__ = sys.stderr150 a = COWDictBase()151 print("a", a)152 a['a'] = 'a'153 a['b'] = 'b'154 a['dict'] = {}155 b = a.copy()156 print("b", b)157 b['c'] = 'b'158 print()159 print("a", a)160 for x in a.iteritems():161 print(x)162 print("--")163 print("b", b)164 for x in b.iteritems():165 print(x)166 print()167 b['dict']['a'] = 'b'168 b['a'] = 'c'169 print("a", a)170 for x in a.iteritems():171 print(x)172 print("--")173 print("b", b)174 for x in b.iteritems():175 print(x)176 print()177 try:178 b['dict2']179 except KeyError as e:180 print("Okay!")181 a['set'] = COWSetBase()182 a['set'].add("o1")183 a['set'].add("o1")184 a['set'].add("o2")185 print("a", a)186 for x in a['set'].itervalues():187 print(x)188 print("--")189 print("b", b)190 for x in b['set'].itervalues():191 print(x)192 print()193 b['set'].add('o3')194 print("a", a)195 for x in a['set'].itervalues():196 print(x)197 print("--")198 print("b", b)199 for x in b['set'].itervalues():200 print(x)201 print()202 a['set2'] = set()203 a['set2'].add("o1")204 a['set2'].add("o1")205 a['set2'].add("o2")206 print("a", a)207 for x in a.iteritems():208 print(x)209 print("--")210 print("b", b)211 for x in b.iteritems(readonly=True):212 print(x)213 print()214 del b['b']215 try:216 print(b['b'])217 except KeyError:218 print("Yay! deleted key raises error")219 if 'b' in b:220 print("Boo!")221 else:222 print("Yay - has_key with delete works!")223 print("a", a)224 for x in a.iteritems():225 print(x)226 print("--")227 print("b", b)228 for x in b.iteritems(readonly=True):229 print(x)230 print()231 b.__revertitem__('b')232 print("a", a)233 for x in a.iteritems():234 print(x)235 print("--")236 print("b", b)237 for x in b.iteritems(readonly=True):238 print(x)239 print()240 b.__revertitem__('dict')241 print("a", a)242 for x in a.iteritems():243 print(x)244 print("--")245 print("b", b)246 for x in b.iteritems(readonly=True):247 print(x)...
testcase.py
Source:testcase.py
1import threading2import pytest3from tornado import ioloop, web4from dummyserver.server import (5 SocketServerThread,6 run_tornado_app,7 run_loop_in_thread,8 DEFAULT_CERTS,9 HAS_IPV6,10)11from dummyserver.handlers import TestingApp12from dummyserver.proxy import ProxyHandler13def consume_socket(sock, chunks=65536):14 consumed = bytearray()15 while True:16 b = sock.recv(chunks)17 consumed += b18 if b.endswith(b"\r\n\r\n"):19 break20 return consumed21class SocketDummyServerTestCase(object):22 """23 A simple socket-based server is created for this class that is good for24 exactly one request.25 """26 scheme = "http"27 host = "localhost"28 @classmethod29 def _start_server(cls, socket_handler):30 ready_event = threading.Event()31 cls.server_thread = SocketServerThread(32 socket_handler=socket_handler, ready_event=ready_event, host=cls.host33 )34 cls.server_thread.start()35 ready_event.wait(5)36 if not ready_event.is_set():37 raise Exception("most likely failed to start server")38 cls.port = cls.server_thread.port39 @classmethod40 def start_response_handler(cls, response, num=1, block_send=None):41 ready_event = threading.Event()42 def socket_handler(listener):43 for _ in range(num):44 ready_event.set()45 sock = listener.accept()[0]46 consume_socket(sock)47 if block_send:48 block_send.wait()49 block_send.clear()50 sock.send(response)51 sock.close()52 cls._start_server(socket_handler)53 return ready_event54 @classmethod55 def start_basic_handler(cls, **kw):56 return cls.start_response_handler(57 b"HTTP/1.1 200 OK\r\n" b"Content-Length: 0\r\n" b"\r\n", **kw58 )59 @classmethod60 def teardown_class(cls):61 if hasattr(cls, "server_thread"):62 cls.server_thread.join(0.1)63 def assert_header_received(64 self, received_headers, header_name, expected_value=None65 ):66 header_name = header_name.encode("ascii")67 if expected_value is not None:68 expected_value = expected_value.encode("ascii")69 header_titles = []70 for header in received_headers:71 key, value = header.split(b": ")72 header_titles.append(key)73 if key == header_name and expected_value is not None:74 assert value == expected_value75 assert header_name in header_titles76class IPV4SocketDummyServerTestCase(SocketDummyServerTestCase):77 @classmethod78 def _start_server(cls, socket_handler):79 ready_event = threading.Event()80 cls.server_thread = SocketServerThread(81 socket_handler=socket_handler, ready_event=ready_event, host=cls.host82 )83 cls.server_thread.USE_IPV6 = False84 cls.server_thread.start()85 ready_event.wait(5)86 if not ready_event.is_set():87 raise Exception("most likely failed to start server")88 cls.port = cls.server_thread.port89class HTTPDummyServerTestCase(object):90 """ A simple HTTP server that runs when your test class runs91 Have your test class inherit from this one, and then a simple server92 will start when your tests run, and automatically shut down when they93 complete. For examples of what test requests you can send to the server,94 see the TestingApp in dummyserver/handlers.py.95 """96 scheme = "http"97 host = "localhost"98 host_alt = "127.0.0.1" # Some tests need two hosts99 certs = DEFAULT_CERTS100 @classmethod101 def _start_server(cls):102 cls.io_loop = ioloop.IOLoop.current()103 app = web.Application([(r".*", TestingApp)])104 cls.server, cls.port = run_tornado_app(105 app, cls.io_loop, cls.certs, cls.scheme, cls.host106 )107 cls.server_thread = run_loop_in_thread(cls.io_loop)108 @classmethod109 def _stop_server(cls):110 cls.io_loop.add_callback(cls.server.stop)111 cls.io_loop.add_callback(cls.io_loop.stop)112 cls.server_thread.join()113 @classmethod114 def setup_class(cls):115 cls._start_server()116 @classmethod117 def teardown_class(cls):118 cls._stop_server()119class HTTPSDummyServerTestCase(HTTPDummyServerTestCase):120 scheme = "https"121 host = "localhost"122 certs = DEFAULT_CERTS123class HTTPDummyProxyTestCase(object):124 http_host = "localhost"125 http_host_alt = "127.0.0.1"126 https_host = "localhost"127 https_host_alt = "127.0.0.1"128 https_certs = DEFAULT_CERTS129 proxy_host = "localhost"130 proxy_host_alt = "127.0.0.1"131 @classmethod132 def setup_class(cls):133 cls.io_loop = ioloop.IOLoop.current()134 app = web.Application([(r".*", TestingApp)])135 cls.http_server, cls.http_port = run_tornado_app(136 app, cls.io_loop, None, "http", cls.http_host137 )138 app = web.Application([(r".*", TestingApp)])139 cls.https_server, cls.https_port = run_tornado_app(140 app, cls.io_loop, cls.https_certs, "https", cls.http_host141 )142 app = web.Application([(r".*", ProxyHandler)])143 cls.proxy_server, cls.proxy_port = run_tornado_app(144 app, cls.io_loop, None, "http", cls.proxy_host145 )146 cls.server_thread = run_loop_in_thread(cls.io_loop)147 @classmethod148 def teardown_class(cls):149 cls.io_loop.add_callback(cls.http_server.stop)150 cls.io_loop.add_callback(cls.https_server.stop)151 cls.io_loop.add_callback(cls.proxy_server.stop)152 cls.io_loop.add_callback(cls.io_loop.stop)153 cls.server_thread.join()154@pytest.mark.skipif(not HAS_IPV6, reason="IPv6 not available")155class IPv6HTTPDummyServerTestCase(HTTPDummyServerTestCase):156 host = "::1"157@pytest.mark.skipif(not HAS_IPV6, reason="IPv6 not available")158class IPv6HTTPDummyProxyTestCase(HTTPDummyProxyTestCase):159 http_host = "localhost"160 http_host_alt = "127.0.0.1"161 https_host = "localhost"162 https_host_alt = "127.0.0.1"163 https_certs = DEFAULT_CERTS164 proxy_host = "::1"...
storage.py
Source:storage.py
1#2# SPDX-License-Identifier: MIT3#4import re5import time6from oeqa.runtime.case import OERuntimeTestCase7from oeqa.core.decorator.depends import OETestDepends8from oeqa.core.decorator.data import skipIfQemu9class StorageBase(OERuntimeTestCase):10 def storage_mount(cls, tmo=1):11 (status, output) = cls.target.run('mkdir -p %s' % cls.mount_point)12 (status, output) = cls.target.run('mount %s %s' % (cls.device, cls.mount_point))13 msg = ('Mount failed: %s.' % status)14 cls.assertFalse(output, msg = msg)15 time.sleep(tmo)16 (status, output) = cls.target.run('cat /proc/mounts')17 match = re.search('%s' % cls.device, output)18 if match:19 msg = ('Device %s not mounted.' % cls.device)20 cls.assertTrue(match, msg = msg)21 (status, output) = cls.target.run('mkdir -p %s' % cls.test_dir)22 (status, output) = cls.target.run('rm -f %s/*' % cls.test_dir)23 msg = ('Failed to cleanup files @ %s/*' % cls.test_dir)24 cls.assertFalse(output, msg = msg)25 def storage_basic(cls):26 # create file on device27 (status, output) = cls.target.run('touch %s/%s' % (cls.test_dir, cls.test_file))28 msg = ('File %s not created on %s' % (cls.test_file, cls.device))29 cls.assertFalse(status, msg = msg)30 # move file31 (status, output) = cls.target.run('mv %s/%s %s/%s1' % 32 (cls.test_dir, cls.test_file, cls.test_dir, cls.test_file))33 msg = ('File %s not moved to %s' % (cls.test_file, cls.device))34 cls.assertFalse(status, msg = msg)35 # remove file36 (status, output) = cls.target.run('rm %s/%s1' % (cls.test_dir, cls.test_file))37 msg = ('File %s not removed on %s' % (cls.test_file, cls.device))38 cls.assertFalse(status, msg = msg)39 def storage_read(cls):40 # check if message is in file41 (status, output) = cls.target.run('cat %s/%s' % 42 (cls.test_dir, cls.test_file))43 match = re.search('%s' % cls.test_msg, output)44 msg = ('Test message %s not in file %s.' % (cls.test_msg, cls.test_file))45 cls.assertEqual(status, 0, msg = msg)46 def storage_write(cls):47 # create test message in file on device48 (status, output) = cls.target.run('echo "%s" > %s/%s' % 49 (cls.test_msg, cls.test_dir, cls.test_file))50 msg = ('File %s not create test message on %s' % (cls.test_file, cls.device))51 cls.assertEqual(status, 0, msg = msg)52 def storage_umount(cls, tmo=1):53 time.sleep(tmo)54 (status, output) = cls.target.run('umount %s' % cls.mount_point)55 if status == 32:56 # already unmounted, should it fail?57 return58 else:59 msg = ('Device not unmount %s' % cls.mount_point)60 cls.assertEqual(status, 0, msg = msg)61 (status, output) = cls.target.run('cat /proc/mounts')62 match = re.search('%s' % cls.device, output)63 if match:64 msg = ('Device %s still mounted.' % cls.device)65 cls.assertTrue(match, msg = msg)66class UsbTest(StorageBase):67 '''68 This is to mimic the usb test previously done in manual bsp-hw.json69 '''70 @classmethod71 def setUpClass(self):72 self.test_msg = "Hello World - USB"73 self.mount_point = "/media/usb"74 self.device = "/dev/sda1"75 self.test_file = "usb.tst"76 self.test_dir = os.path.join(self.mount_point, "oeqa")77 @skipIfQemu('qemuall', 'Test only runs on real hardware')78 @OETestDepends(['ssh.SSHTest.test_ssh'])79 def test_usb_mount(self):80 self.storage_umount(2)81 self.storage_mount(5)82 @skipIfQemu('qemuall', 'Test only runs on real hardware')83 @OETestDepends(['storage.UsbTest.test_usb_mount'])84 def test_usb_basic_operations(self):85 self.storage_basic()86 @skipIfQemu('qemuall', 'Test only runs on real hardware')87 @OETestDepends(['storage.UsbTest.test_usb_basic_operations'])88 def test_usb_basic_rw(self):89 self.storage_write()90 self.storage_read()91 @skipIfQemu('qemuall', 'Test only runs on real hardware')92 @OETestDepends(['storage.UsbTest.test_usb_mount'])93 def test_usb_umount(self):94 self.storage_umount(2)95class MMCTest(StorageBase):96 '''97 This is to mimic the usb test previously done in manual bsp-hw.json98 '''99 @classmethod100 def setUpClass(self):101 self.test_msg = "Hello World - MMC"102 self.mount_point = "/media/mmc"103 self.device = "/dev/mmcblk1p1"104 self.test_file = "mmc.tst"105 self.test_dir = os.path.join(self.mount_point, "oeqa")106 @skipIfQemu('qemuall', 'Test only runs on real hardware')107 @OETestDepends(['ssh.SSHTest.test_ssh'])108 def test_mmc_mount(self):109 self.storage_umount(2)110 self.storage_mount()111 @skipIfQemu('qemuall', 'Test only runs on real hardware')112 @OETestDepends(['storage.MMCTest.test_mmc_mount'])113 def test_mmc_basic_operations(self):114 self.storage_basic()115 @skipIfQemu('qemuall', 'Test only runs on real hardware')116 @OETestDepends(['storage.MMCTest.test_mmc_basic_operations'])117 def test_mmc_basic_rw(self):118 self.storage_write()119 self.storage_read()120 @skipIfQemu('qemuall', 'Test only runs on real hardware')121 @OETestDepends(['storage.MMCTest.test_mmc_mount'])122 def test_mmc_umount(self):...
Using AI Code Generation
1import { MockBuilder, MockRender, ngMocks } from 'ng-mocks';2import { AppModule } from './app.module';3import { AppComponent } from './app.component';4describe('AppComponent', () => {5 beforeEach(() => MockBuilder(AppComponent, AppModule));6 it('should render the component', () => {7 const fixture = MockRender(AppComponent);8 expect(ngMocks.formatText(fixture)).toEqual('Hello World!');9 });10});11import { Component } from '@angular/core';12@Component({13})14export class AppComponent {}15import { MockBuilder, MockRender, MockInstance } from 'ng-mocks';16import { AppModule } from './app.module';17import { AppComponent } from './app.component';18import { DataService } from './data.service';19describe('AppComponent', () => {20 beforeEach(() =>21 MockBuilder(AppComponent, AppModule).mock(DataService, MockDataService)22 );23 it('should render the component', () => {24 const fixture = MockRender(AppComponent);25 expect(ngMocks.formatText(fixture)).toEqual('Hello World!');26 });27});28import { Component } from '@angular/core';29import { DataService } from './data.service';30@Component({31})32export class AppComponent {33 constructor(private dataService: DataService) {}34}35import { Injectable } from '@angular/core';36@Injectable()37export class DataService {38 getData() {39 return 'Hello World!';40 }41}42export class MockDataService {43 getData() {44 return 'Hello World!';45 }46}47import
Using AI Code Generation
1import { MockBuilder, MockedComponentFixture, MockRender, ngMocks } from 'ng-mocks';2import { AppComponent } from './app.component';3describe('AppComponent', () => {4 ngMocks.faster();5 let component: AppComponent;6 let fixture: MockedComponentFixture<AppComponent>;7 beforeEach(() => MockBuilder(AppComponent));8 beforeEach(() => {9 fixture = MockRender(AppComponent);10 component = fixture.point.componentInstance;11 });12 it('should create', () => {13 expect(component).toBeTruthy();14 });15});
Using AI Code Generation
1import { TestBed } from '@angular/core/testing';2import { NgMocks } from 'ng-mocks';3import { AppComponent } from './app.component';4describe('AppComponent', () => {5 beforeEach(async () => {6 await TestBed.configureTestingModule({7 }).compileComponents();8 });9 it('should create the app', () => {
Using AI Code Generation
1import { MockBuilder, MockRender, ngMocks } from 'ng-mocks';2import { AppModule } from './app.module';3import { AppComponent } from './app.component';4beforeEach(() => MockBuilder(AppComponent, AppModule));5it('renders the component', () => {6 const fixture = MockRender(AppComponent);7 const component = ngMocks.findInstance(AppComponent);8 const component = ngMocks.findInstance(fixture, AppComponent);9 const component = ngMocks.findInstance(fixture.debugElement, AppComponent);10 const component = ngMocks.findInstance(fixture.nativeElement, AppComponent);11});12import { MockBuilder, MockRender, ngMocks } from 'ng-mocks';13import { AppModule } from './app.module';14import { AppComponent } from './app.component';15beforeEach(() => MockBuilder(AppComponent, AppModule));16it('renders the component', () => {17 const fixture = MockRender(AppComponent);18 const service = ngMocks.findInstance(ExampleService);19 const service = ngMocks.findInstance(fixture, ExampleService);20 const service = ngMocks.findInstance(fixture.debugElement, ExampleService);21 const service = ngMocks.findInstance(fixture.nativeElement, ExampleService);22});23import { MockBuilder, MockRender, ngMocks } from 'ng-mocks';24import { AppModule } from './app.module';25import { AppComponent } from './app.component';26beforeEach(() => MockBuilder(AppComponent, AppModule));27it('renders the component', () => {28 const fixture = MockRender(AppComponent);29 const component = ngMocks.findInstance(ExampleComponent);30 const component = ngMocks.findInstance(fixture, ExampleComponent);31 const component = ngMocks.findInstance(fixture.debugElement, ExampleComponent);
Using AI Code Generation
1import { Component, Input } from '@angular/core';2import { MockBuilder, MockRender, ngMocks } from 'ng-mocks';3@Component({4 template: '{{ value }}',5})6class TargetComponent {7 @Input()8 public value: string;9}10describe('issue-428', () => {11 beforeEach(() => MockBuilder(TargetComponent));12 it('renders', () => {13 const fixture = MockRender(TargetComponent, {14 });15 ngMocks.cls(fixture.debugElement, 'value');16 });17});
Using AI Code Generation
1import { MockBuilder, MockRender } from 'ng-mocks';2import { MyComponent } from './my.component';3beforeEach(() => MockBuilder(MyComponent).keep(MyService));4it('renders', () => {5 const fixture = MockRender(MyComponent);6 expect(fixture.point.componentInstance).toBeDefined();7});8import { MockBuilder, MockRender } from 'ng-mocks';9import { MyComponent } from './my.component';10beforeEach(() => MockBuilder(MyComponent).keep(MyService));11it('renders', () => {12 const fixture = MockRender(MyComponent);13 expect(fixture.point.componentInstance).toBeDefined();14});15import { MockBuilder, MockRender } from 'ng-mocks';16import { MyComponent } from './my.component';17beforeEach(() => MockBuilder(MyComponent).keep(MyService));18it('renders', () => {19 const fixture = MockRender(MyComponent);20 expect(fixture.point.componentInstance).toBeDefined();21});
Using AI Code Generation
1 const service = ngMocks.findInstance(fixture.debugElement, ExampleService);2 const service = ngMocks.findInstance(fixture.nativeElement, ExampleService);3});4import { MockBuilder, MockRender, ngMocks } from 'ng-mocks';5import { AppModule } from './app.module';6import { AppComponent } from './app.component';7beforeEach(() => MockBuilder(AppComponent, AppModule));8it('renders the component', () => {9 const fixture = MockRender(AppComponent);10 const component = ngMocks.findInstance(ExampleComponent);11 const component = ngMocks.findInstance(fixture, ExampleComponent);12 const component = ngMocks.findInstance(fixture.debugElement, ExampleComponent);
Using AI Code Generation
1import { TestBed } from '@angular/core/testing';2import { NgMocks } from 'ng-mocks';3import { AppComponent } from './app.component';4describe('AppComponent', () => {5 beforeEach(async () => {6 await TestBed.configureTestingModule({7 }).compileComponents();8 });9 it('should create the app', () => {
Using AI Code Generation
1import { MockBuilder, MockRender, ngMocks } from 'ng-mocks';2import { AppModule } from './app.module';3import { AppComponent } from './app.component';4beforeEach(() => MockBuilder(AppComponent, AppModule));5it('renders the component', () => {6 const fixture = MockRender(AppComponent);7 const component = ngMocks.findInstance(AppComponent);
Using AI Code Generation
1 const component = ngMocks.findInstance(fixture.debugElement, AppComponent);2 const component = ngMocks.findInstance(fixture.nativeElement, AppComponent);3});4import { MockBuilder, MockRender, ngMocks } from 'ng-mocks';5import { AppModule } from './app.module';6import { AppComponent } from './app.component';7beforeEach(() => MockBuilder(AppComponent, AppModule));8it('renders the component', () => {9 const fixture = MockRender(AppComponent);10 const service = ngMocks.findInstance(ExampleService);11 const service = ngMocks.findInstance(fixture, ExampleService);12 const service = ngMocks.findInstance(fixture.debugElement, ExampleService);13 const service = ngMocks.findInstance(fixture.nativeElement, ExampleService);14});15import { MockBuilder, MockRender, ngMocks } from 'ng-mocks';16import { AppModule } from './app.module';17import { AppComponent } from './app.component';18beforeEach(() => MockBuilder(AppComponent, AppModule));19it('renders the component', () => {20 const fixture = MockRender(AppComponent);21 const component = ngMocks.findInstance(ExampleComponent);22 const component = ngMocks.findInstance(fixture, ExampleComponent);23 const component = ngMocks.findInstance(fixture.debugElement, ExampleComponent);
Using AI Code Generation
1import { Component, Input } from '@angular/core';2import { MockBuilder, MockRender, ngMocks } from 'ng-mocks';3@Component({4 template: '{{ value }}',5})6class TargetComponent {7 @Input()8 public value: string;9}10describe('issue-428', () => {11 beforeEach(() => MockBuilder(TargetComponent));12 it('renders', () => {13 const fixture = MockRender(TargetComponent, {14 });15 ngMocks.cls(fixture.debugElement, 'value');16 });17});
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!