Best Python code snippet using fMBT_python
createNFSView.py
Source:createNFSView.py
1#!/usr/bin/env python2"""Create a Cohesity NFS View Using python"""3# import pyhesity wrapper module4from pyhesity import *5# command line arguments6import argparse7parser = argparse.ArgumentParser()8parser.add_argument('-v', '--vip', type=str, required=True) # Cohesity cluster name or IP9parser.add_argument('-u', '--username', type=str, required=True) # Cohesity Username10parser.add_argument('-d', '--domain', type=str, default='local') # Cohesity User Domain11parser.add_argument('-n', '--viewname', type=str, required=True) # name view to create12parser.add_argument('-s', '--storagedomain', type=str, default='DefaultStorageDomain') # name of storage domain to use13parser.add_argument('-q', '--qospolicy', type=str, choices=['Backup Target Low', 'Backup Target High', 'TestAndDev High', 'TestAndDev Low'], default='TestAndDev High') # qos policy14parser.add_argument('-w', '--whitelist', action='append', default=[]) # ip to whitelist15parser.add_argument('-l', '--quotalimit', type=int, default=None) # quota limit16parser.add_argument('-a', '--quotaalert', type=int, default=None) # quota alert threshold17parser.add_argument('-c', '--clearwhitelist', action='store_true') # erase existing whitelist18parser.add_argument('-r', '--removewhitelistentries', action='store_true') # remove whitelist entries specified with -w19parser.add_argument('-x', '--updateexistingview', action='store_true') # allow update of existing view (otherwise exit if view exists)20parser.add_argument('-lm', '--lockmode', type=str, choices=['Compliance', 'Enterprise', 'None', 'compliance', 'enterprise', 'none'], default='None') # datalock mode21parser.add_argument('-dl', '--defaultlockperiod', type=int, default=1) # default lock period22parser.add_argument('-al', '--autolockminutes', type=int, default=0) # autolock after idle minutes23parser.add_argument('-ml', '--minimumlockperiod', type=int, default=0) # minimum manual lock period24parser.add_argument('-xl', '--maximumlockperiod', type=int, default=1) # maximum manual lock period25parser.add_argument('-lt', '--manuallockmode', type=str, choices=['ReadOnly', 'FutureATime', 'readonly', 'futureatim'], default='ReadOnly') # manual locking type26parser.add_argument('-lu', '--lockunit', type=str, choices=['minute', 'hour', 'day', 'minutes', 'hours', 'days'], default='minute') # lock period units27args = parser.parse_args()28vip = args.vip29username = args.username30domain = args.domain31viewName = args.viewname32storageDomain = args.storagedomain33qosPolicy = args.qospolicy34whitelist = args.whitelist35quotalimit = args.quotalimit36quotaalert = args.quotaalert37removewhitelistentries = args.removewhitelistentries38clearwhitelist = args.clearwhitelist39updateexistingview = args.updateexistingview40lockmode = args.lockmode41defaultlockperiod = args.defaultlockperiod42autolockminutes = args.autolockminutes43minimumlockperiod = args.minimumlockperiod44maximumlockperiod = args.maximumlockperiod45manuallockmode = args.manuallockmode46lockunit = args.lockunit47lockunitmap = {'minute': 60000, 'minutes': 60000, 'hour': 3600000, 'hours': 3600000, 'day': 86400000, 'days': 86400000}48lockunitmultiplier = lockunitmap[lockunit]49# netmask2cidr50def netmask2cidr(netmask):51 bin = ''.join(["{0:b}".format(int(o)) for o in netmask.split('.')])52 if '0' in bin:53 cidr = bin.index('0')54 else:55 cidr = 3256 return cidr57# authenticate58apiauth(vip, username, domain)59existingview = None60views = api('get', 'views')61if views['count'] > 0:62 existingviews = [v for v in views['views'] if v['name'].lower() == viewName.lower()]63 if(len(existingviews) > 0):64 existingview = existingviews[0]65if existingview is not None and updateexistingview is not True:66 print('view %s already exists' % viewName)67 exit(0)68if existingview is None:69 # find storage domain70 sd = [sd for sd in api('get', 'viewBoxes') if sd['name'].lower() == storageDomain.lower()]71 if len(sd) != 1:72 print("Storage domain %s not found!" % storageDomain)73 exit()74 sdid = sd[0]['id']75 # new view parameters76 newView = {77 "caseInsensitiveNamesEnabled": True,78 "enableNfsViewDiscovery": True,79 "enableSmbAccessBasedEnumeration": False,80 "enableSmbViewDiscovery": True,81 "fileExtensionFilter": {82 "isEnabled": False,83 "mode": "kBlacklist",84 "fileExtensionsList": []85 },86 "protocolAccess": "kNFSOnly",87 "securityMode": "kNativeMode",88 "subnetWhitelist": [],89 "qos": {90 "principalName": qosPolicy91 },92 "name": viewName,93 "viewBoxId": sdid94 }95else:96 newView = existingview97if clearwhitelist is True:98 newView['subnetWhitelist'] = []99if len(whitelist) > 0:100 for ip in whitelist:101 if ',' in ip:102 (thisip, netmask) = ip.split(',')103 netmask = netmask.lstrip()104 cidr = netmask2cidr(netmask)105 else:106 thisip = ip107 netmask = '255.255.255.255'108 cidr = 32109 existingEntry = []110 if 'subnetWhitelist' in newView:111 existingEntry = [e for e in newView['subnetWhitelist'] if e['ip'] == thisip and e['netmaskBits'] == cidr]112 if removewhitelistentries is not True and len(existingEntry) == 0:113 newView['subnetWhitelist'].append({114 "description": '',115 "nfsAccess": "kReadWrite",116 "smbAccess": "kReadWrite",117 "nfsRootSquash": False,118 "ip": thisip,119 "netmaskIp4": netmask120 })121 else:122 if removewhitelistentries is True:123 newView['subnetWhitelist'] = [e for e in newView['subnetWhitelist'] if not (e['ip'] == thisip and e['netmaskBits'] == cidr)]124# apply quota125if quotalimit is not None:126 if quotaalert is None:127 quotaalert = quotalimit - (quotalimit / 10)128 quotalimit = quotalimit * (1024 * 1024 * 1024)129 quotaalert = quotaalert * (1024 * 1024 * 1024)130 newView['logicalQuota'] = {131 "hardLimitBytes": quotalimit,132 "alertLimitBytes": quotaalert133 }134# apply datalock135if lockmode.lower() != 'none':136 newView['fileLockConfig'] = {}137 if lockmode.lower() == 'enterprise':138 newView['fileLockConfig']['mode'] = "kEnterprise"139 if lockmode.lower() == 'compliance':140 newView['fileLockConfig']['mode'] = "kCompliance"141 if autolockminutes > 0:142 newView['fileLockConfig']['autoLockAfterDurationIdle'] = autolockminutes * 60000143 newView['fileLockConfig']['defaultFileRetentionDurationMsecs'] = defaultlockperiod * lockunitmultiplier144 if maximumlockperiod < defaultlockperiod:145 maximumlockperiod = defaultlockperiod146 if maximumlockperiod <= minimumlockperiod or defaultlockperiod <= minimumlockperiod:147 print("default and maximum lock periods must be greater than the minimum lock period")148 exit()149 minimumlockmsecs = minimumlockperiod * lockunitmultiplier150 if minimumlockmsecs == 0:151 minimumlockmsecs = 60000152 newView['fileLockConfig']['minRetentionDurationMsecs'] = minimumlockmsecs153 maximumlockmsecs = maximumlockperiod * lockunitmultiplier154 if maximumlockmsecs <= (minimumlockmsecs + 240000):155 maximumlockmsecs = minimumlockmsecs + 240000156 newView['fileLockConfig']['maxRetentionDurationMsecs'] = maximumlockmsecs157 if manuallockmode.lower() == 'readonly':158 newView['fileLockConfig']['lockingProtocol'] = 'kSetReadOnly'159 else:160 newView['fileLockConfig']['lockingProtocol'] = 'kSetAtime'161 newView['fileLockConfig']['expiryTimestampMsecs'] = 0162# update qos policy163newView['qos']['principalName'] = qosPolicy164# create the view165if existingview is None:166 print("Creating view %s..." % viewName)167 result = api('post', 'views', newView)168else:169 print("Updating view %s..." % viewName)...
cloneBackupToView.py
Source:cloneBackupToView.py
1#!/usr/bin/env python2"""Create a Cohesity NFS View Using python"""3# import pyhesity wrapper module4from pyhesity import *5from time import sleep6from datetime import datetime7# command line arguments8import argparse9parser = argparse.ArgumentParser()10parser.add_argument('-v', '--vip', type=str, required=True) # Cohesity cluster name or IP11parser.add_argument('-u', '--username', type=str, required=True) # Cohesity Username12parser.add_argument('-d', '--domain', type=str, default='local') # Cohesity User Domain13parser.add_argument('-n', '--viewname', type=str, required=True) # name view to create14parser.add_argument('-q', '--qospolicy', type=str, choices=['Backup Target Low', 'Backup Target High', 'TestAndDev High', 'TestAndDev Low'], default='TestAndDev High') # qos policy15parser.add_argument('-w', '--whitelist', action='append', default=[]) # ip to whitelist16parser.add_argument('-x', '--deleteview', action='store_true') # delete existing view17parser.add_argument('-j', '--jobname', type=str, default=None) # name job to clone18parser.add_argument('-o', '--objectname', type=str, default=None) # name job to clone19parser.add_argument('-a', '--allruns', action='store_true') # delete existing view20args = parser.parse_args()21vip = args.vip22username = args.username23domain = args.domain24viewName = args.viewname25qosPolicy = args.qospolicy26whitelist = args.whitelist27deleteview = args.deleteview28jobname = args.jobname29objectname = args.objectname30allruns = args.allruns31# netmask2cidr32def netmask2cidr(netmask):33 bin = ''.join(["{0:b}".format(int(o)) for o in netmask.split('.')])34 if '0' in bin:35 cidr = bin.index('0')36 else:37 cidr = 3238 return cidr39# authenticate40apiauth(vip, username, domain)41if deleteview is not True:42 if jobname is None:43 print('-j, --jobname required!')44 exit(1)45 # get protection job46 job = [job for job in api('get', 'protectionJobs') if job['name'].lower() == jobname.lower()]47 if not job:48 print("Job '%s' not found" % jobname)49 exit(1)50 else:51 job = job[0]52 sdid = job['viewBoxId']53existingview = None54views = api('get', 'views')55if views['count'] > 0:56 existingviews = [v for v in views['views'] if v['name'].lower() == viewName.lower()]57 if(len(existingviews) > 0):58 existingview = existingviews[0]59if existingview is None and deleteview is not True:60 # new view parameters61 newView = {62 "caseInsensitiveNamesEnabled": True,63 "enableNfsViewDiscovery": True,64 "enableSmbAccessBasedEnumeration": False,65 "enableSmbViewDiscovery": True,66 "fileExtensionFilter": {67 "isEnabled": False,68 "mode": "kBlacklist",69 "fileExtensionsList": []70 },71 "protocolAccess": "kNFSOnly",72 "securityMode": "kNativeMode",73 "subnetWhitelist": [],74 "qos": {75 "principalName": qosPolicy76 },77 "name": viewName,78 "viewBoxId": sdid79 }80 if len(whitelist) > 0:81 for ip in whitelist:82 if ',' in ip:83 (thisip, netmask) = ip.split(',')84 netmask = netmask.lstrip()85 cidr = netmask2cidr(netmask)86 else:87 thisip = ip88 netmask = '255.255.255.255'89 cidr = 3290 newView['subnetWhitelist'].append({91 "description": '',92 "nfsAccess": "kReadWrite",93 "smbAccess": "kReadWrite",94 "nfsRootSquash": False,95 "ip": thisip,96 "netmaskIp4": netmask97 })98 print("Creating new view %s..." % viewName)99 result = api('post', 'views', newView)100 sleep(5)101 views = api('get', 'views')102 if views['count'] > 0:103 existingviews = [v for v in views['views'] if v['name'].lower() == viewName.lower()]104 if(len(existingviews) > 0):105 view = existingviews[0]106else:107 if deleteview is True:108 if existingview:109 print("Deleting view %s..." % viewName)110 result = api('delete', 'views/%s' % viewName)111 else:112 print("View %s does not exist" % viewName)113 exit(0)114 else:115 if existingview['viewBoxId'] != job['viewBoxId']:116 print('View and job must be in the same storage domain!')117 exit(1)118 print("Using existing view: %s" % viewName)119 view = existingview120successStates = ['kSuccess', 'kWarning']121# get runs122thisObjectFound = False123runs = [r for r in api('get', 'protectionRuns?jobId=%s' % job['id']) if r['backupRun']['snapshotsDeleted'] is False and r['backupRun']['status'] in successStates]124if len(runs) > 0:125 for run in runs:126 runType = run['backupRun']['runType'][1:]127 for sourceInfo in run['backupRun']['sourceBackupStatus']:128 thisObjectName = sourceInfo['source']['name']129 if objectname is None or thisObjectName.lower() == objectname.lower():130 if sourceInfo['status'] in successStates:131 thisObjectFound = True132 sourceView = sourceInfo['currentSnapshotInfo']['viewName']133 if 'relativeSnapshotDirectory' in sourceInfo['currentSnapshotInfo']:134 sourcePath = sourceInfo['currentSnapshotInfo']['relativeSnapshotDirectory']135 else:136 sourcePath = ''137 starttimeString = datetime.strptime(usecsToDate(run['backupRun']['stats']['startTimeUsecs']), '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d_%H-%M-%S')138 destinationPath = "%s-%s-%s" % (thisObjectName, starttimeString, runType)139 CloneDirectoryParams = {140 'destinationDirectoryName': destinationPath,141 'destinationParentDirectoryPath': '/%s' % view['name'],142 'sourceDirectoryPath': '/%s/%s' % (sourceView, sourcePath),143 }144 folderPath = "%s:/%s/%s" % (vip, viewName, destinationPath)145 print("Cloning %s backup files to %s" % (thisObjectName, folderPath))146 result = api('post', 'views/cloneDirectory', CloneDirectoryParams)147 if thisObjectFound is False:148 print('No runs found containing %s' % objectname)149else:150 print('No runs found for job %s' % jobname)...
xmlrpc.py
Source:xmlrpc.py
1# Copyright (c) 2003-2007 Open Source Applications Foundation2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14from application import schema15from osaf import pim16import datetime17from twisted.web import xmlrpc18from util import commandline19# For current( )20from osaf.views import detail21def setattr_withtype(item, attribute, stringValue):22 """23 A wrapper around setattr which correctly unserializes a string24 value into an appropriate attribute value25 """26 v = item.getAttributeAspect(attribute, 'type').makeValue(stringValue)27 return setattr(item, attribute, v)28def getServletView(repository, name=None):29 if name is None:30 name = "XMLRPC"31 for existingView in repository.views:32 if existingView.name == name:33 return existingView34 return repository.createView(name)35class XmlRpcResource(xmlrpc.XMLRPC):36 def render(self, request):37 'Only allow XML-RPC connections from localhost'38 if request.getClientIP() == '127.0.0.1':39 return xmlrpc.XMLRPC.render(self, request)40 else:41 return xmlrpc.Fault(self.FAILURE, "Not localhost")42 def xmlrpc_echo(self, text):43 return "I received: '%s'" % text44 def xmlrpc_current(self):45 view = self.repositoryView46 item = None47 for block in detail.DetailRootBlock.iterItems(view=view):48 if hasattr(block, 'widget'):49 item = getattr(block, 'contents', None)50 break51 if item is not None:52 result = item.displayName53 else:54 result = "nothing selected"55 return result56 def xmlrpc_commandline(self, text, viewName=None):57 view = getServletView(self.repositoryView.repository, viewName)58 view.refresh()59 commandline.execute_command(view, text)60 view.commit()61 return "OK" # ???62 def xmlrpc_note(self, title, body, viewName=None):63 view = getServletView(self.repositoryView.repository, viewName)64 view.refresh()65 note = pim.Note(itsView=view, displayName=title, body=body)66 event = pim.EventStamp(note)67 event.add()68 event.startTime = datetime.datetime.now(tz=view.tzinfo.floating)69 event.duration = datetime.timedelta(minutes=60)70 event.anyTime = False71 allCollection = schema.ns('osaf.pim', view).allCollection72 allCollection.add(note)73 view.commit()74 return "OK" # ???75 #76 # Generic repository API77 #78 def generic_item_call(self, viewName, method, objectPath, *args, **kwds):79 view = getServletView(self.repositoryView.repository, viewName)80 view.refresh()81 try:82 item = view.findPath(objectPath)83 result = method(item, *args, **kwds)84 except Exception, e:85 print "error in generic_item_call: %s" % e86 raise87 # ugh, XML-RPC doesn't like None as a result88 if result is None:89 return True90 return result91 def xmlrpc_setAttribute(self, viewName, objectPath, attrName, value):92 """93 generic setAttribute - assumes the value is a atomic value -94 i.e. a string or an integer or something95 """96 return self.generic_item_call(viewName, setattr_withtype, objectPath, attrName, value)97 def xmlrpc_getAttribute(self, viewName, objectPath, attrName, value):98 """99 generic getAttribute - assumes the resulting value will be an100 atomic value like a string or an integer101 """102 return self.generic_item_call(viewName, getattr, objectPath, attrName, value)103 def xmlrpc_delAttribute(self, viewName, objectPath, attrName):104 """105 removes an attribute from an object106 """107 return self.generic_item_call(viewName, delattr, objectPath, attrName)108 def xmlrpc_commit(self, viewName=None):109 """110 commits the xml-rpc view111 """112 view = getServletView(self.repositoryView.repository, viewName)113 view.commit()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!