Best Python code snippet using autotest_python
mysql.py
Source:mysql.py
1import configparser2import datetime3import os4import pyAesCrypt5import re6import resource7import shutil8import subprocess9import sys10import tarfile11import tempfile12import time13class DeletionMeta():14 def __init__(self):15 self._candidates = {}16 def insert(self, candidate):17 candidate.inject(self._candidates)18 return self19 def candidateForBackup(self, backup):20 return DeletionCandidate.byBackup(self._candidates, backup)21class DeletionCandidate():22 def __init__(self, meta):23 self._expirationTime = None24 self._meta = meta25 def remove(self, resort):26 self._backup.remove(resort)27 return self28 @classmethod29 def byBackup(cls, dict, backup):30 return dict[backup.getName()]31 def inject(self, dict):32 dict[self._backup.getName()] = self33 return self34 def getName(self):35 return self._backup.getName()36 def parse(self, backup):37 self._backup = backup38 self._meta.insert(self)39 return self40 def rule(self, rule):41 tagName, validTime = rule.split(':')42 if not self._backup.hasTag(tagName):43 return self44 unit, amount = self.__parseRuleIntoUnitAmonut(validTime)45 interval = self.__parseExpirationInterval(unit, amount)46 newExpirationTime = self._backup.getCreationDate() + interval47 if self.isLater(newExpirationTime):48 self._expirationTime = newExpirationTime49 return self50 def __parseRuleIntoUnitAmonut(self, validTime):51 result = re.match('([0-9]+)(.*)', validTime)52 if not result:53 raise ValueError(validTime+' could not be parsed as valid time')54 amount = int(result.group(1))55 unit = result.group(2)56 return unit, amount57 def __parseExpirationInterval(self, unit, amount):58 units = {59 'day': self.daysDelta,60 'days': self.daysDelta,61 'week': self.weeksDelta,62 'weeks': self.weeksDelta,63 }64 try:65 interval = units[unit](amount)66 except KeyError:67 validUnits = ','.join( units.keys() )68 raise KeyError(unit+' is not a valid unit. Accepted values: '+validUnits)69 return interval70 def isLater(self, newExpirationTime):71 if not self._expirationTime:72 return True73 isLater = newExpirationTime > self._expirationTime74 if isLater:75 return True76 return False77 def daysDelta(self, amount):78 return datetime.timedelta(days=amount)79 def weeksDelta(self, amount):80 return datetime.timedelta(weeks=amount)81 def orphaned(self):82 if self._backup.isFull():83 return False84 parents = self._backup.getParents()85 for parent in parents:86 parentCandidate = self._meta.candidateForBackup(parent)87 if not parentCandidate.shouldDelete():88 return False89 return True90 def expired(self):91 if not self._expirationTime:92 return False93 return self._expirationTime < datetime.datetime.now()94 def shouldDelete(self):95 if self.expired():96 return True97 if self.orphaned():98 return True99 return False100 def printExpiration(self):101 self._backup.print(suffix=' expires at '+str(self._expirationTime) )102 def print(self):103 if not self.shouldDelete():104 return105 if self.expired():106 self._backup.print(suffix=' has expired at '+str(self._expirationTime))107 if self.orphaned():108 self._backup.print(suffix=' will be orphaned after the pruning')109 def getCreationDate(self):110 return self._backup.getCreationDate()111class Finder():112 def backups(self, backups):113 self.backups = backups114 return self115 def parameters(self, parameters):116 self._parameters = parameters117 return self118class Sorter():119 def sort(self, backups):120 return sorted(backups, key=lambda backup: backup.getCreationDate())121class LatestTagFinder(Finder):122 def __init__(self, sorter=Sorter()):123 self._sorter = sorter124 self._parameters = []125 def find(self, backups):126 if len(self._parameters) == 0:127 raise KeyError("Missing tag to search")128 requiredTag = self._parameters[0]129 backupsWithTags = []130 for backup in backups:131 if backup.hasTag(requiredTag):132 backupsWithTags.append(backup)133 sortedBackups = self._sorter.sort(backupsWithTags)134 return sortedBackups[-1]135class LatestFinder(Finder):136 def __init__(self, sorter=Sorter()):137 self._sorter = sorter138 def find(self, backups):139 sortedBackups = self._sorter.sort(backups)140 return sortedBackups[-1]141class LatestFullFinder(Finder):142 def __init__(self, sorter=Sorter()):143 self._sorter = sorter144 def find(self, backups):145 fullBackups = []146 for backup in backups:147 if not backup.isFull():148 continue149 fullBackups.append(backup)150 sortedFullBackups = self._sorter.sort(fullBackups)151 return sortedFullBackups[-1]152class BackupNotFoundException(Exception):153 pass154class MySQLBackupMeta:155 def __init__(self):156 self.byEndingPoint = {}157 self.byStartingPoint = {}158 def addByEndingPoint(self, endingPoint,backup):159 try:160 self.byEndingPoint[endingPoint].append(backup)161 except KeyError:162 self.byEndingPoint[endingPoint] = [ backup ]163 return self164 def addByStartingPoint(self, startingPoint,backup):165 try:166 self.byStartingPoint[startingPoint].append(backup)167 except KeyError:168 self.byStartingPoint[startingPoint] = [ backup ]169 return self170 def parents(self, childStartingPoint):171 try:172 return self.byEndingPoint[childStartingPoint]173 except KeyError:174 return None175 def children(self, parentEndingPoint):176 try:177 return self.byStartingPoint[parentEndingPoint]178 except KeyError:179 return []180class MySQLBackup:181 def __init__(self, sorter=Sorter()):182 self._name = ''183 self._sorter = sorter184 self._tags = []185 def remove(self, resort):186 resort.remove(self._name)187 return self188 def getName(self):189 return self._name190 def addTag(self, tag):191 if tag in self._tags:192 return self193 self._tags.append(tag)194 return self195 def mysql(self, mysql):196 self._mysql = mysql197 return self198 def name(self, name):199 self._name = name200 return self201 def meta(self, meta):202 self._meta = meta203 return self204 def hasTag(self, tag):205 return tag in self._tags206 def writeIni(self, filePath):207 config = configparser.ConfigParser()208 config['main'] = {}209 config['main']['tags'] = ','.join(self._tags)210 with open(filePath, 'w') as file:211 config.write(file)212 def parseIni(self, extraIni):213 config = configparser.ConfigParser()214 config.read_string(extraIni)215 self._tags = config['main']['tags'].split(',')216 return self217 def parseInfo(self, infoContent):218 dummyHeader = '[main]\n'219 iniFile = dummyHeader+infoContent220 xtrabackupInfoContent = configparser.ConfigParser()221 xtrabackupInfoContent.read_string(iniFile)222 isFullBackup = False223 startingPoint = int(xtrabackupInfoContent['main']['innodb_from_lsn'])224 if startingPoint == 0:225 isFullBackup = True226 self._full = isFullBackup227 self._startingPoint = startingPoint228 endingPoint = int(xtrabackupInfoContent['main']['innodb_to_lsn'])229 self._endingPoint = endingPoint230 self._meta.addByEndingPoint(endingPoint, self)231 self._meta.addByStartingPoint(startingPoint, self)232 return self233 def isNamed(self, name):234 return self._name == name235 def isFull(self):236 return self._full237 def print(self, indent = 0, prefix='', suffix=''):238 tags = self._tags239 tagList = ','.join(self._tags)240 tagInfo = ' tags:'+tagList241 print( (' ' * indent) + prefix + self._name + tagInfo + suffix)242 def printRecursive(self, indent = 0):243 self.print(indent)244 sortedChildren = self._sorter.sort( self.getChildren() )245 for child in sortedChildren:246 if child is self:247 continue248 if not child.isClosestRelative(self):249 continue250 child.printRecursive(indent + 2)251 def isBefore(self, other):252 return self.getCreationDate() < other.getCreationDate()253 def getClosestRelative(self):254 children = self.getChildren()255 sortedChildren = self._sorter.sort(children)256 return sortedChildren[0]257 def isClosestRelative(self, other):258 try:259 closestRelative = other.getClosestRelative()260 return closestRelative == self261 except IndexError:262 return False263 def getCreationDate(self):264 return datetime.datetime.strptime(self._name, '%Y-%m-%d_%H-%M')265 def getHistory(self):266 history = [self]267 parent = self.getParent()268 while parent:269 history.append(parent)270 parent = parent.getParent()271 history.reverse()272 fullBackup = history.pop(0)273 return [ fullBackup, history ]274 def isParent(self, backup):275 if backup in self.getParents():276 return True277 for parent in self.getParents():278 if parent.isParent(backup):279 return True280 return False281 def getParent(self):282 if self._full:283 return None284 parents = self._meta.parents(self._startingPoint)285 return self.__skipEmptyParents(parents)[0]286 def getParents(self):287 if self._full:288 return []289 parents = self._meta.parents(self._startingPoint)290 parents = self.__cantBeOwnParent(parents)291 return self.__parentsMustBeBefore(parents)292 def __cantBeOwnParent(self, parents):293 return list( filter(lambda backup: backup._name != self._name, parents) )294 def __skipEmptyParents(self, parents):295 return list( filter(lambda backup: not backup.isEmpty(), parents) )296 def __parentsMustBeBefore(self, parents):297 return list( filter(lambda backup: backup.isBefore(self), parents) )298 def isEmpty(self):299 return self._startingPoint == self._endingPoint300 def getChildren(self):301 children = self._meta.children(self._endingPoint)302 withoutSelf = self.__cantBeOwnChild(children)303 return self.__childCantBeBefore(withoutSelf)304 def __cantBeOwnChild(self, children):305 return list( filter(lambda backup: backup._name != self._name, children) )306 def __childCantBeBefore(self, children):307 return list( filter(lambda backup: self.isBefore(backup), children ) )308 def inject(self, backupList):309 backupList.append(self)310 return self311 def setTimestamp(self, gauge):312 timestamp = self.getCreationDate().timestamp()313 gauge.set(timestamp)314 return self315class MySQL:316 def __init__(self, sorter=Sorter()):317 self._mysqlHost = os.environ.get('MYSQL_HOST', 'mysql')318 self._mysqlPort = os.environ.get('MYSQL_PORT', '3306')319 self._mysqlUsername = os.environ.get('MYSQL_USERNAME', 'backup')320 self._mysqlPassword = os.environ.get('MYSQL_PASSWORD')321 self._mysqlDatadir = os.environ.get('MYSQL_DATADIR', False)322 self._backupCommand = os.environ.get('MYSQL_COMMAND', 'mariabackup')323 self._bufferSize = int(os.environ.get('MYSQL_ENC_BUFSIZE', 64 * 1024))324 self._password = os.environ.get('MYSQL_ENC_PASSWORD')325 self._assetBase = os.path.dirname(os.path.realpath(__file__))+'/assets'326 self._tags = []327 self._sorter = sorter328 self._dryRun = False329 self._specialNames = {330 'latest-full-backup': LatestFullFinder(),331 'latest-backup': LatestFinder(),332 'latest-tag': LatestTagFinder(),333 }334 def setOutput(self, output):335 self.output = output336 def dataDir(self, dataDir):337 self._mysqlDatadir = dataDir338 return self339 def dryRun(self, dryRun):340 self._dryRun = dryRun341 return self342 def resort(self, resort):343 self._resort = resort344 return self345 def pruneBackups(self, rules):346 candidates = self.__buildDeletionCanidates(rules)347 sortedCandidates = self._sorter.sort(candidates)348 for candidate in sortedCandidates:349 candidate.print()350 if self._dryRun:351 print("Dry run - not executing")352 return353 for candidate in candidates:354 if not candidate.shouldDelete():355 continue356 candidate.remove( self._resort.adapter('mysql') )357 def __buildDeletionCanidates(self, rules):358 candidates = []359 meta = DeletionMeta()360 backups = self.list()361 for backup in backups:362 candidate = DeletionCandidate(meta).parse(backup)363 for rule in rules:364 candidate.rule(rule)365 candidates.append(candidate)366 return candidates367 def incrementalBackup(self, name, parentName):368 parent = self.find(parentName)369 print('Basing backup on:')370 parent.print()371 return self.__backup(name, parent._endingPoint)372 def fullBackup(self, name):373 return self.__backup(name)374 def tags(self, tags):375 self._tags = tags376 return self377 def __backup(self, name, baseLsn = None):378 fileLimit = self.__getFileLimit()379 with tempfile.TemporaryDirectory() as tempDirectory:380 backupDirectory = tempDirectory+'/backup'381 os.mkdir(backupDirectory)382 command = [383 'mariabackup',384 '--backup',385 '--target-dir='+backupDirectory,386 '--host='+self._mysqlHost,387 '--user='+self._mysqlUsername,388 '--password='+self._mysqlPassword,389 '--port='+self._mysqlPort,390 ]391 if self._mysqlDatadir:392 command.append('--datadir='+self._mysqlDatadir)393 if baseLsn:394 command.append('--incremental-lsn='+str(baseLsn))395 completedProcess = subprocess.run(command, check=True)396 self.__extractBackupInfo(backupDirectory, tempDirectory)397 tarFile = self.__compressBackup(backupDirectory, tempDirectory)398 self.__encryptBackup(tarFile)399 newBackup = MySQLBackup()400 for tag in self._tags:401 newBackup.addTag(tag)402 newBackup.writeIni(tempDirectory+'/cloudbackup.ini')403 self.__uploadBackup(tempDirectory, name)404 return self405 def __extractBackupInfo(self, backupDirectory, tempDirectory):406 shutil.copy(backupDirectory+'/xtrabackup_info', tempDirectory+'/xtrabackup_info')407 def __compressBackup(self, backupDirectory, tempDirectory):408 tarFilePath = tempDirectory+'/backup.tar.bz2'409 tar = tarfile.open(tarFilePath, 'w:bz2')410 tar.add(backupDirectory, 'backup')411 tar.close()412 shutil.rmtree(backupDirectory)413 return tarFilePath414 def __encryptBackup(self, tarFilePath):415 pyAesCrypt.encryptFile(tarFilePath, tarFilePath+'.aes', self._password, self._bufferSize)416 os.remove(tarFilePath)417 def __uploadBackup(self, backupDirectory, name):418 self._resort.adapter('mysql').upload(backupDirectory, name)419 def getSpecialBackups(self):420 availableBackups = self.list()421 specialBackups = {}422 for specialName in self._specialNames:423 specialNameFinder = self._specialNames[specialName]424 try:425 specialBackups[specialName] = specialNameFinder.find(availableBackups)426 except IndexError:427 pass428 except KeyError:429 pass430 return specialBackups431 def find(self, name):432 availableBackups = self.list()433 specialParameters = name.split(':')434 specialName = specialParameters.pop(0)435 if specialName in self._specialNames.keys():436 specialNameFinder = self._specialNames[specialName]437 return specialNameFinder.parameters(specialParameters).find(availableBackups)438 for backup in availableBackups:439 if backup.isNamed(name):440 return backup441 raise BackupNotFoundException()442 def restore(self, name, dataDir, port=8080):443 dataDirAbsolute = os.path.abspath(dataDir)444 backup = self.find(name)445 fullBackup, history = backup.getHistory()446 self.output.info("Full Backup: "+fullBackup.getName())447 self.output.info("Backup history:")448 for backup in history:449 self.output.info("- "+backup.getName() )450 self.output.info("Restoring Full Backup")451 self.__restoreFullBackup(fullBackup, dataDirAbsolute)452 self.output.info("Restoring Incremental Backups")453 self.__restoreIncrementalBackups(history, dataDirAbsolute)454 self.__copyDockerCompose(dataDirAbsolute+'/backup/docker-compose.yml', port)455 def __restoreFullBackup(self, backup, dataDir):456 self.__downloadAndExtract(backup, dataDir)457 completedProcess = subprocess.run([458 'mariabackup',459 '--prepare',460 '--target-dir='+dataDir+'/backup',461 ], check=True)462 def __restoreIncrementalBackups(self, backups, dataDir):463 for backup in backups:464 self.__restoreIncrementalBackup(backup, dataDir)465 def __restoreIncrementalBackup(self, backup, dataDir):466 with tempfile.TemporaryDirectory(dir=dataDir) as tempDir:467 self.output.info("Downloading "+backup.getName())468 self.__downloadAndExtract(backup, tempDir)469 completedProcess = subprocess.run([470 'mariabackup',471 '--prepare',472 '--target-dir='+dataDir+'/backup',473 '--incremental-dir='+tempDir+'/backup'474 ], check=True)475 def __downloadAndExtract(self, backup, targetDirectory):476 self._resort.adapter('mysql').download(backup._name, targetDirectory)477 tarFilePath = targetDirectory+'/backup.tar.bz2'478 self.__decryptBackup(tarFilePath)479 self.__unpackBackup(tarFilePath, targetDirectory)480 def __decryptBackup(self, tarFilePath):481 encryptedPath = tarFilePath+'.aes'482 pyAesCrypt.decryptFile(encryptedPath, tarFilePath, self._password, self._bufferSize)483 os.remove(encryptedPath)484 def __unpackBackup(self, tarFilePath, dataDir):485 tar = tarfile.open(tarFilePath, 'r:bz2')486 tar.extractall(dataDir)487 os.remove(tarFilePath)488 def __copyDockerCompose(self, target, port):489 with open(self._assetBase+'/docker-compose.yml', 'r') as file:490 dockerCompose = file.read()491 dockerCompose = dockerCompose.replace('%%PORT%%', str(port))492 with open(target, 'w') as file:493 file.write(dockerCompose)494 def __getFileLimit(self):495 limits = resource.getrlimit(resource.RLIMIT_NOFILE)496 return limits[1]497 def list(self):498 meta = MySQLBackupMeta()499 backups = []500 for directory in self._resort.adapter('mysql').listFolders():501 backup = self.__parseBackup(directory, meta)502 backup.inject(backups)503 sortedBackups = self._sorter.sort(backups)504 return sortedBackups505 def __parseBackup(self, directory, meta):506 backupName = os.path.basename(directory)507 infoContent = self._resort.adapter('mysql').fileContent(directory+'/xtrabackup_info').decode('utf-8')508 cloudbackupIni = self._resort.adapter('mysql').fileContent(directory+'/cloudbackup.ini').decode('utf-8')509 return MySQLBackup() \510 .name(backupName).meta(meta) \511 .parseInfo(infoContent) \512 .parseIni(cloudbackupIni)513 def scrape(self, gauge):514 availableBackups = self.list()515 self.__scrapeLatestBackup(gauge, availableBackups)516 self.__scrapeLatestFullBackup(gauge, availableBackups)517 return self518 def __scrapeLatestBackup(self, gauge, backups):519 try:520 backup = self._specialNames['latest-backup'].find(backups)521 backup.setTimestamp( gauge.labels(self._resort._name, 'Either') )522 except IndexError:523 gauge.labels(self._resort._name, 'Either').set(0)524 def __scrapeLatestFullBackup(self, gauge, backups):525 try:526 fullBackup = self._specialNames['latest-full-backup'].find(backups)527 fullBackup.setTimestamp( gauge.labels(self._resort._name, 'True') )528 except IndexError:...
test_board.py
Source:test_board.py
1#!/usr/bin/python32import unittest3import board4# Deck fixtures5unshuffled = [*range(0,52)]6reversed = [*range(0,52)]7reversed.reverse()8no_aces = board.parseDeck("""99H KD 8H JD AC 9S 7C 6D10JH 3H 7S TC AH 2C 6C 4H11KH AD TH 2D 3S QC JS 4S125D TD 8C TS 6H JC 8S 7D13QS QD 4D 3C AS 5H 7H QH145C KC 8D 9C 2H 3D KS 6S159D 5S 2S 4C16""")17two_aces = board.parseDeck("""18KH 2H 4D QC 3S 7D 7S KD19JD 4H 3C 7C 9H KS 7H 5D206S AC QS TD JC 9C JH KC219S 4S QH 2D 4C AD TS 8H22TH 2S JS 6C 9D 3D 8D TC232C 5H 8C 5S 3H 6H 8S 6D24AH 5C QD AS25""")26two_aces_two = board.parseDeck("""276H AH 3H 2D JC 4D TH QD28TS 7D 9D 2C 7S QS JD 5D298H 4S 6S 5H 6C KD KC JH30QC TD 8S 7C 4C KS 9H 3S312S 2H 9S TC 8D 5C AD 4H32AS 5S 8C QH JS 6D KH 7H33AC 3D 9C 3C34""")35class CardUnitTest(unittest.TestCase):36 def test_makeCard(self):37 for card in range(0,52):38 self.assertEqual(card, board.makeCard(card // 13, card % 13))39 def test_suit(self):40 for card in range(0,52):41 self.assertEqual(card // 13, board.suit(card), )42 def test_pips(self):43 for card in range(0,52):44 self.assertEqual(card % 13, board.pips(card))45 def assert_formatCard(self, formatted, suit, pips):46 card = board.makeCard(suit, pips)47 actual = board.formatCard(card)48 self.assertEqual(formatted, actual, f"Card format failed %{pips}; {suit}")49 actual = board.parseCard(formatted)50 self.assertEqual(card, actual, f"Card parse %{pips}; {suit}")51 def test_formatCard(self):52 self.assert_formatCard('AC', 0, 0)53 self.assert_formatCard('2C', 0, 1)54 self.assert_formatCard('9C', 0, 8)55 self.assert_formatCard('TC', 0, 9)56 self.assert_formatCard('JC', 0, 10)57 self.assert_formatCard('QC', 0, 11)58 self.assert_formatCard('KC', 0, 12)59class BoardUnitTest(unittest.TestCase):60 def assert_init(self, deck):61 cards = len(deck)62 self.assertEqual(0, cards % 13, f"Uneven suits: {cards} cards in the deck")63 b = board.Board(deck)64 suits = cards // 1365 self.assertEqual(suits, b._nsuits)66 self.assertEqual(suits, len(b._foundations))67 for a, ace in enumerate(b._foundations):68 self.assertEqual(ace, board.noCard, f"Ace {a} not empty")69 self.assertEqual(suits * 2, len(b._tableau))70 for c, cascade in enumerate(b._tableau):71 expected = cards // len(b._tableau) + (c < suits)72 self.assertEqual(expected, len(cascade), f"Column {c} incorrectly sized")73 for r, card in enumerate(cascade):74 expected = deck[r * suits * 2 + c]75 self.assertEqual(expected, card)76 self.assertIsNone(b._memento, "Pre-initialised memento")77 self.assertTrue(b._resort, "Not initialised to resort")78 self.assertTrue(b._rehash, "Not initialised to rehash")79 self.assertEqual(len(b._tableau), len(b._sorted), "Hashing cascades not initialised")80 for s, actual in enumerate(b._sorted):81 self.assertEqual(b._tableau[s], actual)82 def test_init(self):83 self.assert_init(unshuffled)84 def assert_str(self, expected, deck):85 self.assertEqual(expected, str(board.Board(deck)))86 def test_str_unshuffled(self):87 expected = """Aces:88-S -H -D -C89Table:90AC 2C 3C 4C 5C 6C 7C 8C919C TC JC QC KC AD 2D 3D924D 5D 6D 7D 8D 9D TD JD93QD KD AH 2H 3H 4H 5H 6H947H 8H 9H TH JH QH KH AS952S 3S 4S 5S 6S 7S 8S 9S96TS JS QS KS -- -- -- --97Cells:98-- -- -- --99"""100 self.assert_str(expected, unshuffled)101 def test_str_reversed(self):102 expected = """Aces:103-S -H -D -C104Table:105KS QS JS TS 9S 8S 7S 6S1065S 4S 3S 2S AS KH QH JH107TH 9H 8H 7H 6H 5H 4H 3H1082H AH KD QD JD TD 9D 8D1097D 6D 5D 4D 3D 2D AD KC110QC JC TC 9C 8C 7C 6C 5C1114C 3C 2C AC -- -- -- --112Cells:113-- -- -- --114"""115 self.assert_str(expected, reversed)116 def test_str_two_aces(self):117 expected = """Aces:118-S -H -D -C119Table:120KH 2H 4D QC 3S 7D 7S KD121JD 4H 3C 7C 9H KS 7H 5D1226S AC QS TD JC 9C JH KC1239S 4S QH 2D 4C AD TS 8H124TH 2S JS 6C 9D 3D 8D TC1252C 5H 8C 5S 3H 6H 8S 6D126AH 5C QD AS -- -- -- --127Cells:128-- -- -- --129"""130 self.assert_str(expected, two_aces)131 def test_move_to_foundations_no_aces(self):132 b = board.Board(no_aces)133 b._rehash = b._resort = False134 actual = b.moveToFoundations()135 expected = []136 self.assertEqual(expected, actual)137 self.assertFalse(b._rehash)138 self.assertFalse(b._resort)139 def test_move_to_foundations_two_aces(self):140 b = board.Board(two_aces)141 b._rehash = b._resort = False142 actual = b.moveToFoundations()143 expected = [(0, -3,), (3, -4),]144 self.assertEqual(expected, actual)145 self.assertTrue(b._rehash)146 self.assertFalse(b._resort)147 def test_move_to_foundations_two_aces_two(self):148 b = board.Board(two_aces_two)149 b._rehash = b._resort = False150 actual = b.moveToFoundations()151 expected = [(0, -1,), (0, -4), (0, -4), ]152 self.assertEqual(expected, actual)153 self.assertTrue(b._rehash)154 self.assertFalse(b._resort)155 def test_move_to_foundations_reversed(self):156 setup = board.Board(reversed)157 setup._rehash = setup._resort = False158 actual = setup.moveToFoundations()159 self.assertTrue(setup._rehash)160 self.assertTrue(setup._resort)161 def test_move_to_foundations_cell(self):162 setup = board.Board(unshuffled)163 # One king in a cell164 for cascade in setup._tableau: cascade.clear()165 king = 12166 queen = king - 1167 clubs = 0168 setup._cells = [ board.noCard, board.makeCard(clubs, king), board.noCard, board.noCard, ]169 setup._foundations = [queen, king, king, king, ]170 expected = [(9, -1,),]171 actual = setup.moveToFoundations()172 self.assertEqual(expected, actual)173 def test_move_between_cascades_and_cells(self):174 b = board.Board(unshuffled)175 # Move to cell #1176 b.moveCard((0,8,))177 self.assertTrue(b._rehash)178 self.assertEqual(1, b._firstFree)179 self.assertEqual(6, len(b._tableau[0]))180 # Move to cell #2181 b.moveCard((0,9,))182 self.assertTrue(b._rehash)183 self.assertEqual(2, b._firstFree)184 self.assertEqual(5, len(b._tableau[0]))185 # Move to cell #3186 b.moveCard((1,10,))187 self.assertTrue(b._rehash)188 self.assertEqual(3, b._firstFree)189 self.assertEqual(6, len(b._tableau[1]))190 # Move to cell #4191 b.moveCard((2,11,))192 self.assertTrue(b._rehash)193 self.assertEqual(4, b._firstFree)194 self.assertEqual(6, len(b._tableau[2]))195 # Move to cascade #1196 b.moveCard((11,3,))197 self.assertTrue(b._rehash)198 self.assertEqual(3, b._firstFree)199 self.assertEqual(8, len(b._tableau[3]))200 # Move to cascade #2201 b.moveCard((10,3,))202 self.assertTrue(b._rehash)203 self.assertEqual(2, b._firstFree)204 self.assertEqual(9, len(b._tableau[3]))205 # Move to cascade #3206 b.moveCard((8,3,))207 self.assertTrue(b._rehash)208 self.assertEqual(0, b._firstFree)209 self.assertEqual(10, len(b._tableau[3]))210 # Move to cascade #4211 b.moveCard((9,1,))212 self.assertTrue(b._rehash)213 self.assertEqual(0, b._firstFree)214 self.assertEqual(7, len(b._tableau[1]))215 def test_move_between_cascades_and_foundations(self):216 b = board.Board(two_aces_two)217 # Move to foundations #1218 b._resort = b._rehash = False219 b.moveCard((0,-1,))220 self.assertTrue(b._rehash)221 self.assertFalse(b._resort)222 self.assertEqual(0, b._firstFree)223 self.assertEqual(6, len(b._tableau[0]))224 self.assertEqual([0, board.noCard, board.noCard, board.noCard,], b._foundations)225 # Move to foundations #2226 b._resort = b._rehash = False227 b.moveCard((0,-4,))228 self.assertTrue(b._rehash)229 self.assertFalse(b._resort)230 self.assertEqual(0, b._firstFree)231 self.assertEqual(5, len(b._tableau[0]))232 self.assertEqual([0, board.noCard, board.noCard, 0,], b._foundations)233 # Move to foundations #3234 b._resort = b._rehash = False235 b.moveCard((0,-4,))236 self.assertTrue(b._rehash)237 self.assertFalse(b._resort)238 self.assertEqual(0, b._firstFree)239 self.assertEqual(4, len(b._tableau[0]))240 self.assertEqual([0, board.noCard, board.noCard, 1,], b._foundations)241 # Move from foundations #1242 b._resort = b._rehash = False243 b.moveCard((-4,0,), False)244 self.assertTrue(b._rehash)245 self.assertFalse(b._resort)246 self.assertEqual(0, b._firstFree)247 self.assertEqual(5, len(b._tableau[0]))248 self.assertEqual([0, board.noCard, board.noCard, 0,], b._foundations)249 # Move from foundations #2250 b._resort = b._rehash = False251 b.moveCard((-4,0,), False)252 self.assertTrue(b._rehash)253 self.assertFalse(b._resort)254 self.assertEqual(0, b._firstFree)255 self.assertEqual(6, len(b._tableau[0]))256 self.assertEqual([0, board.noCard, board.noCard, board.noCard,], b._foundations)257 # Move from foundations #2258 b._resort = b._rehash = False259 b.moveCard((-1,0,), False)260 self.assertTrue(b._rehash)261 self.assertFalse(b._resort)262 self.assertEqual(0, b._firstFree)263 self.assertEqual(7, len(b._tableau[0]))264 self.assertEqual([board.noCard, board.noCard, board.noCard, board.noCard,], b._foundations)265 def test_move_to_empty_cascade(self):266 b = board.Board(unshuffled)267 # Put kings on the foundations268 king = 12269 for f, foundation in enumerate(b._foundations): b._foundations[f] = king270 # Clear the cascades271 for cascade in b._tableau: cascade.clear()272 # Move a king to an empty cell273 b.moveCard((-1, 0,))274 self.assertEqual(king, b._tableau[0][-1])275 self.assertTrue(b._resort)276 self.assertTrue(b._rehash)277 # Move a queen to an empty cell278 queen = king - 1279 b._resort = b._rehash = False280 b.moveCard((-1, 1,))281 self.assertEqual(queen, b._tableau[1][-1])282 self.assertTrue(b._resort)283 self.assertTrue(b._rehash)284 def test_enumerate_finish_to_card(self):285 b = board.Board(no_aces)286 start = 1287 actual = b.enumerateFinishCascades(start, b._tableau[start][-1])288 expected = [(start, 7,),]289 self.assertEqual(expected, actual)290 def test_enumerate_finish_to_empty(self):291 setup = board.Board(unshuffled)292 # Set up an empty cascade293 setup._foundations = [12 - len(setup._tableau) + 1, 12, 12, 12,]294 # Clubs in the top row, except the first cascade295 for start, cascade in enumerate(setup._tableau):296 cascade.clear()297 if start: cascade.append(start)298 # Every card can move to the leftmost or the next cascade299 for start, cascade in enumerate(setup._tableau):300 if cascade:301 actual = setup.enumerateFinishCascades(start, cascade[-1])302 expected = []303 if start + 1 < len(setup._tableau):304 expected.append( (start, start + 1,) )305 self.assertEqual(expected, actual, f"From cascade {start}" )306 def test_enumerate_moves_unshuffled(self):307 setup = board.Board(unshuffled)308 width = len(setup._tableau)309 # 3. Move from cascades to the next open width310 expected = [(start, width,) for start in range(width)]311 # 2. Move from cascades to cascades312 for start in range(width):313 if start != 3:314 expected.append((start, (start + 1) % width,))315 # 1. Move from cells to cascades316 # ...317 actual = setup.enumerateMoves()318 self.assertEqual(expected, actual)319 def test_enumerate_moves_no_aces(self):320 setup = board.Board(no_aces)321 width = len(setup._tableau)322 # 3. Move from cascades to the next open cell323 expected = [(start, width,) for start in range(width)]324 # 2. Move from cascades to cascades325 expected.append( (1, 7,) )326 # 1. Move from cells to cascades327 actual = setup.enumerateMoves()328 self.assertEqual(expected, actual)329 def test_enumerate_moves_two_aces(self):330 setup = board.Board(two_aces)331 width = len(setup._tableau)332 # 3. Move from cascades to the next open cell333 expected = [(start, width,) for start in range(width)]334 # 2. Move from cascades to cascades335 # 1. Move from cells to cascades336 actual = setup.enumerateMoves()337 self.assertEqual(expected, actual)338 def test_enumerate_moves_two_aces_two(self):339 setup = board.Board(two_aces_two)340 width = len(setup._tableau)341 # Clear the aces342 setup.moveToFoundations()343 # First move options - cells only344 expected = [(start, width,) for start in range(width)]345 actual = setup.enumerateMoves()346 self.assertEqual( expected, actual )347 # Uncover QH348 setup.moveCard( (3, width,) )349 expected = [(start, width + 1,) for start in range(width)]350 expected.append( (3, 6,) )351 actual = setup.enumerateMoves()352 self.assertEqual(expected, actual)353 def test_enumerate_moves_keep_sequences(self):354 setup = board.Board(unshuffled)355 setup._foundations = [8, 6, 8, 7, ]356 cascades = ( "TC 9C", "7H 9S 9D KH QH JH TH", "QC", "QD JD TD", "8C KD", "9H 8H", "KS QS JS", "",)357 for t, s in enumerate( cascades ):358 setup._tableau[ t ] = board.parseDeck( s )359 setup._cells = board.parseDeck( "KC TS JC --" )360 setup._firstFree = 3361 # Validate stacking362 stacked = (0, 1, 3, 5, 6, )363 for c in stacked: self.assertTrue( board.isStacked( setup._tableau[ c ] ) )364 isolate = (2, 4, )365 for c in isolate: self.assertFalse( board.isStacked( setup._tableau[ c ] ) )366 opens = (7, )367 for c in opens: self.assertFalse( board.isStacked( setup._tableau[ c ] ) )368 expected = []369 # Move stacked cascades to open cell370 expected.extend( [ (start, 11, ) for start in stacked ] )371 expected.pop()372 # Move isolate cascades to open cell373 expected.extend( [ (start, 11, ) for start in isolate ] )374 # Move cell 1 to open cascade375 expected.append( ( 8, 7, ) )376 # Move cell 2 to stack377 expected.append( ( 9, 6, ) )378 # Move cell 2 to open cascade379 expected.append( ( 9, 7, ) )380 # Move cell 3 to stack381 expected.append( ( 10, 2, ) )382 # Move cell 3 to open cascade383 expected.append( ( 10, 7, ) )384 # Move stacked cascades to open cascade385 expected.extend( [ (start, 7, ) for start in stacked ] )386 expected.pop()387 # Move isolate cascades to open cascade388 expected.extend( [ (start, 7, ) for start in isolate ] )389 expected.pop( -2 )390 actual = setup.enumerateMoves()391 self.assertEqual(expected, actual)392 def test_enumerate_moves_cover_aces(self):393 setup = board.Board(unshuffled)394 setup._foundations = [4, -1, 12, 1, ]395 cascades = (396 "QS",397 "KD QD JD TD 9D 8D 7D 6D 5D 4D",398 "JC",399 "JS TS 9S 8S",400 "KC QC JC TC 9C 8C 7C",401 "2D",402 "3S AD KS",403 "7S 6S",)404 for t, s in enumerate( cascades ):405 setup._tableau[ t ] = board.parseDeck( s )406 setup._cells = board.parseDeck( "4S -- 5S 3D" )407 setup._firstFree = 1408 expected = [(3, 9), (7, 9), (0, 9), (2, 9), (5, 9), (6, 9), (10, 7), (11, 1), (0, 6), ]409 actual = setup.enumerateMoves()410 self.assertEqual(expected, actual)411 def test_memento_values(self):412 visited = set()413 for deck in (unshuffled, reversed, no_aces, two_aces, two_aces_two,):414 setup = board.Board(deck)415 self.assertTrue(setup._resort)416 self.assertTrue(setup._rehash)417 memento = setup.memento()418 self.assertFalse(setup._resort)419 self.assertFalse(setup._rehash)420 self.assertEqual(memento, setup._memento)421 self.assertFalse( memento in visited, f"Duplicate memento #{len(visited)}: {memento}" )422 visited.add(memento)423 # Check that a new copy gets the same hash424 setup = board.Board(deck)425 self.assertEqual(memento, setup.memento())426 def test_not_solved(self):427 setup = board.Board(unshuffled)428 self.assertFalse( setup.solved() )429 def test_solved(self):430 setup = board.Board(reversed)431 setup.moveToFoundations()432 self.assertTrue( setup.solved() )433 def test_backtrack_foundations(self):434 setup = board.Board(reversed)435 expected = str(setup)436 # Backtracking should undo everything437 moves = setup.moveToFoundations()438 setup.backtrack( moves )439 actual = str(setup)440 self.assertEqual(expected, actual)441 def assert_backtrack(self, deck):442 setup = board.Board(deck)443 setup.moveToFoundations()444 moves = setup.enumerateMoves()445 for move in moves:446 expected = str(setup)447 setup.moveCard(move)448 setup.backtrack( [move,] )449 actual = str(setup)450 self.assertEqual(expected, actual, move)451 def test_backtrack_no_aces(self):452 self.assert_backtrack(no_aces)453 def test_backtrack_two_aces(self):454 self.assert_backtrack(two_aces)455 def test_backtrack_two_aces_two(self):456 self.assert_backtrack(two_aces_two)457 def assert_solve(self, setup, expected, display = False):458 b = board.Board(setup)459 solution = b.solve()460 actual = len(solution)461 self.assertEqual(expected, actual)462 if display:463 forward = []464 while solution:465 moves = solution.pop()466 forward.append(moves.copy())467 b.backtrack(moves)468 forward.reverse()469 for moves in forward:470 print(moves)471 for move in moves:472 b.moveCard(move, False)473 print(b)474 def test_solve_unshuffled(self):475 self.assert_solve(unshuffled, 958)476 def test_solve_reversed(self):477 self.assert_solve(reversed, 1)478 def test_solve_no_aces(self):479 self.assert_solve(no_aces, 555, False)480 def test_solve_two_aces(self):481 self.assert_solve(two_aces, 86, False)482 def test_solve_two_aces_two(self):483 self.assert_solve(two_aces_two, 72)484if __name__ == '__main__':...
borg.py
Source:borg.py
1import datetime2import dateutil.parser3import json4import os5import subprocess6from prometheus_client import Gauge7class FileBackup():8 def parse(self, dict):9 self._name = dict['name']10 self._time = dateutil.parser.isoparse(dict['time'])11 return self12 def getName(self):13 return self._name14 def inject(self, list):15 list.append(self)16 return self17 def setTimestamp(self, gauge):18 gauge.set(self._time.timestamp())19 return self20 def print(self):21 print('- '+self._name+' made at '+str(self._time) )22class Borg:23 def __init__(self):24 self._port = 2325 self._borgPassword = os.environ.get('BORG_PASSWORD')26 self._encryptionMode = 'repokey-blake2'27 def resort(self, resort):28 self._resort = resort29 return self30 def user(self, user):31 self._user = user32 return self33 def host(self, host):34 self._host = host35 return self36 def port(self, port):37 self._port = port38 return self39 def path(self, path):40 self._path = path41 return self42 def keyFilePath(self, keyFilePath):43 self._keyFilePath = keyFilePath44 return self45 def borgPassword(self, borgPassword):46 self._borgPassword = borgPassword47 return self48 def init(self, copies):49 repositoryNumber = 150 while repositoryNumber <= copies:51 self.__createRepository(repositoryNumber)52 repositoryNumber += 153 print("Created")54 repositoryNumber = 155 while repositoryNumber <= copies:56 self.__initRepository(repositoryNumber)57 repositoryNumber += 158 def __createRepository(self, repositoryNumber):59 try:60 fileRepository = 'repo'+str(repositoryNumber)61 self._resort.adapter('files').createFolder(fileRepository)62 print("Folder for Repository "+str(repositoryNumber)+" created")63 except OSError:64 print("Folder for Repository "+str(repositoryNumber)+" already exists")65 def __initRepository(self, repositoryNumber):66 repo = self.__makeRepo(repositoryNumber)67 print("Initializing Repository "+str(repositoryNumber))68 completedProcess = self.command([69 'init',70 '-e',71 self._encryptionMode,72 repo73 ], repositoryNumber)74 if completedProcess.returncode != 0:75 print("Process did not return success:")76 print("Code: "+ str(completedProcess.returncode))77 print( str(completedProcess.stdout) )78 print( str(completedProcess.stderr) )79 return self80 print("Initialized Repository "+str(repositoryNumber)+" successfully")81 return self82 def remove(self, name, repositoryNumber):83 self.__findBackup(name, repositoryNumber)84 print("Removing backup "+name+" from Repository "+str(repositoryNumber))85 completedProcess = self.command([86 'delete',87 '::'+name,88 ], repositoryNumber)89 if completedProcess.returncode != 0:90 print("Process did not return success:")91 print("Code: "+ str(completedProcess.returncode))92 print( completedProcess.stdout.decode('utf-8') )93 print( completedProcess.stderr.decode('utf-8') )94 return self95 print("Removal of "+name+" from Repository "+str(repositoryNumber)+" finished successfully")96 return self97 def backup(self, name, target, repositoryNumber):98 print("Backing up "+target+" to Repository "+str(repositoryNumber))99 completedProcess = self.command([100 'create',101 '::'+name,102 '.'103 ], repositoryNumber, directory=target, capture_output=False)104 print("Backup of "+target+" to Repository "+str(repositoryNumber)+" finished successfully")105 return self106 def umount(self, target):107 completedProcess = subprocess.run([108 'fusermount',109 '-u',110 target111 ],112 capture_output=True113 )114 if completedProcess.returncode != 0:115 print("Failed to unmount "+target)116 print("Code: "+ str(completedProcess.returncode))117 print( completedProcess.stdout.decode('utf-8') )118 print( completedProcess.stderr.decode('utf-8') )119 return self120 def mount(self, name, target, repositoryNumber):121 print("Mounting backup "+name+" from Repository "+str(repositoryNumber)+' to '+target)122 print("The borg mount is run in foreground to facilitate usage inside Docker")123 print("Please cancel the program with an interrupt (control+c) after you are done.")124 completedProcess = self.command([125 'mount',126 '-f',127 '::'+name,128 target129 ], repositoryNumber, check=False)130 if completedProcess.returncode != 0:131 print("Process did not return success:")132 print("Code: "+ str(completedProcess.returncode))133 print( completedProcess.stdout.decode('utf-8') )134 print( completedProcess.stderr.decode('utf-8') )135 return self136 print("Mounted backup "+name+" from Repository "+str(repositoryNumber)+' to '+target)137 return self138 def list(self, repositoryNumber):139 completedProcess = self.command([140 'list',141 '--json',142 '::'143 ], repositoryNumber, check=False)144 if completedProcess.returncode != 0:145 print("Process did not return success:")146 print("Code: "+ str(completedProcess.returncode))147 print( completedProcess.stdout.decode('utf-8') )148 print( completedProcess.stderr.decode('utf-8') )149 raise ValueError("List process failed")150 output = completedProcess.stdout.decode('utf-8')151 decodedOutput = json.loads(output)152 return self.__archivesToBackups(decodedOutput['archives'])153 def __archivesToBackups(self, list):154 backups = []155 for archive in list:156 backup = FileBackup().parse(archive)157 backup.inject(backups)158 sortedBackups = sorted(backups, key=lambda backup : backup._time)159 return sortedBackups160 161 162 def restore(self, name, target, repositoryNumber):163 backup = self.__findBackup(name, repositoryNumber)164 print("Restoring backup "+backup.getName()+" from Repository "+str(repositoryNumber)+' to '+target)165 completedProcess = self.command([166 'extract',167 '::'+backup.getName()168 ], repositoryNumber, directory=target)169 def prune(self, repositoryNumber):170 repo = self.__makeRepo(repositoryNumber)171 print("Pruning file backups in repository "+repositoryNumber)172 completedProcess = self.command([173 'prune',174 '--keep-daily=14',175 '--keep-weekly=6',176 '--keep-monthly=6',177 '::'178 ], repositoryNumber)179 if completedProcess.returncode != 0:180 print("Process did not return success:")181 print("Code: "+ str(completedProcess.returncode))182 print( completedProcess.stdout.decode('utf-8') )183 print( completedProcess.stderr.decode('utf-8') )184 return self185 return completedProcess.stdout.decode('utf-8')186 def command(self, args, repoNumber, directory=None, check=True,187 capture_output=True):188 if directory is None:189 directory = os.getcwd()190 return subprocess.run(191 ['borgbackup'] + args,192 capture_output=capture_output,193 env={194 'BORG_NEW_PASSPHRASE': self._borgPassword,195 'BORG_PASSPHRASE': self._borgPassword,196 'BORG_REPO': self.__makeRepo(repoNumber),197 'SSH_AUTH_SOCK': os.environ.get('SSH_AUTH_SOCK', ''),198 'BORG_RSH': "ssh -o StrictHostKeyChecking=accept-new -i "+self._keyFilePath199 }, cwd=directory, check=check)200 def __makeRepo(self, number):201 return 'ssh://'+self._user+'@'+self._host+':'+str(self._port)+'/.'+self._path+'/repo'+str(number)202 def __findBackup(self, target, repositoryNumber):203 if target == 'latest':204 return self.list(repositoryNumber)[-1]205 206 return target207 def getRepositories(self):208 repos = []209 for directoryName in self._resort.adapter('files').listFolders():210 start = len('repo')211 end = len(directoryName)212 repos.append( directoryName[start:end] )213 return repos214 215 def scrape(self, gauge):216 for repositoryNumber in self.getRepositories():217 try:218 backup = self.__findBackup('latest', repositoryNumber)219 except IndexError:220 gauge.labels(self._resort._name, 'repository_'+str(repositoryNumber)).set(0)221 continue222 backup.setTimestamp( gauge.labels(self._resort._name, 'repository_'+str(repositoryNumber)) )...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!