Best Python code snippet using localstack_python
test_monitor.py
Source:test_monitor.py
...16TESTDESCRIPTION = "TESTDESCRIPTION"17def setup_basic_test(request, bigip):18 monitor1 = bigip.ltm.monitor19 return monitor120def delete_resource(resources):21 for resource in resources.get_collection():22 rn = resource.name23 if rn != 'http' and rn != 'http_head_f5' and rn != 'https'\24 and rn != 'https_443' and rn != 'https_head_f5'\25 and rn != 'diameter' and rn != 'dns' and rn != 'external'\26 and rn != 'firepass' and rn != 'ftp' and rn != 'gateway_icmp'\27 and rn != 'icmp' and rn != 'imap' and rn != 'inband'\28 and rn != 'ldap' and rn != 'module_score' and rn != 'mssql'\29 and rn != 'mysql' and rn != 'nntp' and rn != 'none'\30 and rn != 'oracle' and rn != 'pop3' and rn != 'postgresql'\31 and rn != 'radius' and rn != 'radius_accounting'\32 and rn != 'real_server' and rn != 'rpc' and rn != 'sasp'\33 and rn != 'scripted' and rn != 'sip' and rn != 'smb'\34 and rn != 'smtp' and rn != 'snmp_dca'\35 and rn != 'snmp_dca_base' and rn != 'soap'\36 and rn != 'tcp' and rn != 'tcp_echo'\37 and rn != 'tcp_half_open' and rn != 'udp'\38 and rn != 'virtual_location' and rn != 'wap' and rn != 'wmi':39 pp(resource.__dict__)40 resource.delete()41def setup_http_test(request, bigip, partition, name):42 def teardown():43 delete_resource(hc1)44 request.addfinalizer(teardown)45 hc1 = bigip.ltm.monitor.https46 pp(hc1._meta_data)47 http1 = hc1.http48 http1.create(name=name, partition=partition)49 return http1, hc150class TestMonitor(object):51 def test_get_collection(self, request, bigip):52 m1 = setup_basic_test(request, bigip)53 list_of_references = m1.get_collection()54 assert len(list_of_references) == 3955class TestMonitorHTTP(object):56 def test_http_create_refresh_update_delete_load(self, request, bigip):57 http1, hc1 = setup_http_test(request, bigip, 'Common', 'test1')58 assert http1.name == 'test1'59 http1.description = TESTDESCRIPTION60 http1.update()61 assert http1.description == TESTDESCRIPTION62 http1.description = ''63 http1.refresh()64 assert http1.description == TESTDESCRIPTION65 http2 = hc1.http66 http2.load(partition='Common', name='test1')67 assert http2.selfLink == http1.selfLink68# End HTTP Tests69# Begin HTTPS Tests70def setup_https_test(request, bigip, partition, name):71 def teardown():72 delete_resource(hc1)73 request.addfinalizer(teardown)74 hc1 = bigip.ltm.monitor.https_s75 https1 = hc1.https76 https1.create(name=name, partition=partition)77 return https1, hc178class TestMonitorHTTPS(object):79 def test_https_create_refresh_update_delete_load(self, request, bigip):80 https1, hc1 = setup_https_test(request, bigip, 'Common', 'httpstest')81 assert https1.name == 'httpstest'82 https1.description = TESTDESCRIPTION83 https1.update()84 assert https1.description == TESTDESCRIPTION85 https1.description = ''86 https1.refresh()87 assert https1.description == TESTDESCRIPTION88 https2 = hc1.https89 https2.load(partition='Common', name='httpstest')90 assert https2.selfLink == https1.selfLink91# End HTTPS Tests92# Begin Diameter Tests93def setup_diameter_test(request, bigip, partition, name):94 def teardown():95 delete_resource(hc1)96 request.addfinalizer(teardown)97 hc1 = bigip.ltm.monitor.diameters98 diameter1 = hc1.diameter99 diameter1.create(name=name, partition=partition)100 return diameter1, hc1101class TestMonitorDiameter(object):102 def test_diameter_create_refresh_update_delete_load(self, request, bigip):103 diameter1, hc1 = setup_diameter_test(request, bigip, 'Common',104 'diametertest')105 assert diameter1.name == 'diametertest'106 diameter1.description = TESTDESCRIPTION107 diameter1.update()108 assert diameter1.description == TESTDESCRIPTION109 diameter1.description = ''110 diameter1.refresh()111 assert diameter1.description == TESTDESCRIPTION112 diameter2 = hc1.diameter113 diameter2.load(partition='Common', name='diametertest')114 assert diameter2.selfLink == diameter1.selfLink115# End Diameter Tests116# Begin DNS Tests117def setup_dns_test(request, bigip, partition, name, qname):118 def teardown():119 delete_resource(hc1)120 request.addfinalizer(teardown)121 hc1 = bigip.ltm.monitor.dns_s122 dns1 = hc1.dns123 dns1.create(name=name, partition=partition, qname=qname)124 return dns1, hc1125class TestMonitorDNS(object):126 def test_dns_create_refresh_update_delete_load(self, request, bigip):127 dns1, hc1 = setup_dns_test(request, bigip, 'Common', 'dnstest', 'aqna')128 assert dns1.name == 'dnstest'129 dns1.description = TESTDESCRIPTION130 dns1.update()131 assert dns1.description == TESTDESCRIPTION132 dns1.description = ''133 dns1.refresh()134 assert dns1.description == TESTDESCRIPTION135 dns2 = hc1.dns136 dns2.load(partition='Common', name='dnstest')137 assert dns2.selfLink == dns1.selfLink138# End DNS Tests139# Begin External140def setup_external_test(request, bigip, partition, name):141 def teardown():142 delete_resource(hc1)143 request.addfinalizer(teardown)144 hc1 = bigip.ltm.monitor.externals145 external1 = hc1.external146 external1.create(name=name, partition=partition)147 return external1, hc1148class TestMonitorExternal(object):149 def test_external_create_refresh_update_delete_load(self, request, bigip):150 external1, hc1 = setup_external_test(request, bigip, 'Common',151 'externaltest')152 assert external1.name == 'externaltest'153 external1.description = TESTDESCRIPTION154 external1.update()155 assert external1.description == TESTDESCRIPTION156 external1.description = ''157 external1.refresh()158 assert external1.description == TESTDESCRIPTION159 external2 = hc1.external160 external2.load(partition='Common', name='externaltest')161 assert external2.selfLink == external1.selfLink162# End External Tests163# Begin FirePass164def setup_firepass_test(request, bigip, partition, name):165 def teardown():166 delete_resource(hc1)167 request.addfinalizer(teardown)168 hc1 = bigip.ltm.monitor.firepass_s169 firepass1 = hc1.firepass170 firepass1.create(name=name, partition=partition)171 return firepass1, hc1172class TestMonitorFirePass(object):173 def test_firepass_create_refresh_update_delete_load(self, request, bigip):174 firepass1, hc1 = setup_firepass_test(request, bigip, 'Common',175 'firepasstest')176 assert firepass1.name == 'firepasstest'177 firepass1.description = TESTDESCRIPTION178 firepass1.update()179 assert firepass1.description == TESTDESCRIPTION180 firepass1.description = ''181 firepass1.refresh()182 assert firepass1.description == TESTDESCRIPTION183 firepass2 = hc1.firepass184 firepass2.load(partition='Common', name='firepasstest')185 assert firepass2.selfLink == firepass1.selfLink186# End FirePass Tests187# Begin FTP Tests188def setup_ftp_test(request, bigip, partition, name):189 def teardown():190 delete_resource(hc1)191 request.addfinalizer(teardown)192 hc1 = bigip.ltm.monitor.ftps193 ftp1 = hc1.ftp194 ftp1.create(name=name, partition=partition)195 return ftp1, hc1196class TestMonitorFTP(object):197 def test_ftp_create_refresh_update_delete_load(self, request, bigip):198 ftp1, hc1 = setup_ftp_test(request, bigip, 'Common', 'ftptest')199 assert ftp1.name == 'ftptest'200 ftp1.description = TESTDESCRIPTION201 ftp1.update()202 assert ftp1.description == TESTDESCRIPTION203 ftp1.description = ''204 ftp1.refresh()205 assert ftp1.description == TESTDESCRIPTION206 ftp2 = hc1.ftp207 ftp2.load(partition='Common', name='ftptest')208 assert ftp2.selfLink == ftp1.selfLink209# End FTP Tests210# Begin GateWay-ICMP211def setup_gateway_icmp_test(request, bigip, partition, name):212 def teardown():213 delete_resource(hc1)214 request.addfinalizer(teardown)215 hc1 = bigip.ltm.monitor.gateway_icmps216 gateway_icmp1 = hc1.gateway_icmp217 gateway_icmp1.create(name=name, partition=partition)218 return gateway_icmp1, hc1219class TestMonitorGateWay_ICMP(object):220 def test_gateway_icmp_create_refresh_update_delete_load(self,221 request,222 bigip):223 gateway_icmp1, hc1 =\224 setup_gateway_icmp_test(request,225 bigip,226 'Common',227 'gateway_icmptest')228 assert gateway_icmp1.name == 'gateway_icmptest'229 gateway_icmp1.description = TESTDESCRIPTION230 gateway_icmp1.update()231 assert gateway_icmp1.description == TESTDESCRIPTION232 gateway_icmp1.description = ''233 gateway_icmp1.refresh()234 assert gateway_icmp1.description == TESTDESCRIPTION235 gateway_icmp2 = hc1.gateway_icmp236 gateway_icmp2.load(partition='Common', name='gateway_icmptest')237 assert gateway_icmp2.selfLink == gateway_icmp1.selfLink238# End GateWay-ICMP239# Begin ICMP240def setup_icmp_test(request, bigip, partition, name):241 def teardown():242 delete_resource(hc1)243 request.addfinalizer(teardown)244 hc1 = bigip.ltm.monitor.icmps245 icmp1 = hc1.icmp246 icmp1.create(name=name, partition=partition)247 return icmp1, hc1248class TestMonitorICMP(object):249 def test_icmp_create_refresh_update_delete_load(self, request, bigip):250 icmp1, hc1 = setup_icmp_test(request, bigip, 'Common', 'icmptest')251 assert icmp1.name == 'icmptest'252 icmp1.description = TESTDESCRIPTION253 icmp1.update()254 assert icmp1.description == TESTDESCRIPTION255 icmp1.description = ''256 icmp1.refresh()257 assert icmp1.description == TESTDESCRIPTION258 icmp2 = hc1.icmp259 icmp2.load(partition='Common', name='icmptest')260 assert icmp2.selfLink == icmp1.selfLink261# End ICMP262# Begin IMAP263def setup_imap_test(request, bigip, partition, name):264 def teardown():265 delete_resource(hc1)266 request.addfinalizer(teardown)267 hc1 = bigip.ltm.monitor.imaps268 imap1 = hc1.imap269 imap1.create(name=name, partition=partition)270 return imap1, hc1271class TestMonitorIMAP(object):272 def test_imap_create_refresh_update_delete_load(self, request, bigip):273 imap1, hc1 = setup_imap_test(request, bigip, 'Common', 'imaptest')274 assert imap1.name == 'imaptest'275 imap1.description = TESTDESCRIPTION276 imap1.update()277 assert imap1.description == TESTDESCRIPTION278 imap1.description = ''279 imap1.refresh()280 assert imap1.description == TESTDESCRIPTION281 imap2 = hc1.imap282 imap2.load(partition='Common', name='imaptest')283 assert imap2.selfLink == imap1.selfLink284# End IMAP285# Begin InBand286def setup_inband_test(request, bigip, partition, name):287 def teardown():288 delete_resource(hc1)289 request.addfinalizer(teardown)290 hc1 = bigip.ltm.monitor.inbands291 inband1 = hc1.inband292 inband1.create(name=name, partition=partition)293 return inband1, hc1294class TestMonitorInBand(object):295 def test_inband_create_refresh_update_delete_load(self, request, bigip):296 inband1, hc1 =\297 setup_inband_test(request, bigip, 'Common', 'inbandtest')298 assert inband1.name == 'inbandtest'299 inband1.description = TESTDESCRIPTION300 inband1.update()301 assert inband1.description == TESTDESCRIPTION302 inband1.description = ''303 inband1.refresh()304 assert inband1.description == TESTDESCRIPTION305 inband2 = hc1.inband306 inband2.load(partition='Common', name='inbandtest')307 assert inband2.selfLink == inband1.selfLink308# End InBand309# Begin LDAP310def setup_ldap_test(request, bigip, partition, name):311 def teardown():312 delete_resource(hc1)313 request.addfinalizer(teardown)314 hc1 = bigip.ltm.monitor.ldaps315 ldap1 = hc1.ldap316 ldap1.create(name=name, partition=partition)317 return ldap1, hc1318class TestMonitorLDAP(object):319 def test_ldap_create_refresh_update_delete_load(self, request, bigip):320 ldap1, hc1 = setup_ldap_test(request, bigip, 'Common', 'ldaptest')321 assert ldap1.name == 'ldaptest'322 ldap1.description = TESTDESCRIPTION323 ldap1.update()324 assert ldap1.description == TESTDESCRIPTION325 ldap1.description = ''326 ldap1.refresh()327 assert ldap1.description == TESTDESCRIPTION328 ldap2 = hc1.ldap329 ldap2.load(partition='Common', name='ldaptest')330 assert ldap2.selfLink == ldap1.selfLink331# End LDAP Tests332# Begin Module-Score333def setup_module_score_test(request, bigip, partition, name, snmpaddr):334 def teardown():335 delete_resource(hc1)336 request.addfinalizer(teardown)337 hc1 = bigip.ltm.monitor.module_scores338 module_score1 = hc1.module_score339 creation_params = {'name': name,340 'partition': partition,341 'snmp-ip-address': snmpaddr}342 module_score1.create(**creation_params)343 return module_score1, hc1344class TestMonitorModule_Score(object):345 def test_module_score_create_refresh_update_delete_load(self, request,346 bigip):347 module_score1, hc1 =\348 setup_module_score_test(request,349 bigip,350 'Common',351 'module_scoretest',352 '9.9.9.9')353 assert module_score1.name == 'module_scoretest'354 module_score1.description = TESTDESCRIPTION355 module_score1.update()356 assert module_score1.description == TESTDESCRIPTION357 module_score1.description = ''358 module_score1.refresh()359 assert module_score1.description == TESTDESCRIPTION360 module_score2 = hc1.module_score361 module_score2.load(partition='Common', name='module_scoretest')362 assert module_score2.selfLink == module_score1.selfLink363# End Module-Score364# Begin MSSQL365def setup_mssql_test(request, bigip, partition, name):366 def teardown():367 delete_resource(hc1)368 request.addfinalizer(teardown)369 hc1 = bigip.ltm.monitor.mssqls370 mssql1 = hc1.mssql371 creation_params = {'name': name,372 'partition': partition}373 mssql1.create(**creation_params)374 return mssql1, hc1375class TestMonitorMSSQL(object):376 def test_mssql_create_refresh_update_delete_load(self, request, bigip):377 mssql1, hc1 = setup_mssql_test(request, bigip, 'Common', 'mssqltest')378 assert mssql1.name == 'mssqltest'379 mssql1.description = TESTDESCRIPTION380 mssql1.update()381 assert mssql1.description == TESTDESCRIPTION382 mssql1.description = ''383 mssql1.refresh()384 assert mssql1.description == TESTDESCRIPTION385 mssql2 = hc1.mssql386 mssql2.load(partition='Common', name='mssqltest')387 assert mssql2.selfLink == mssql1.selfLink388# End MSSQL389# Begin MYSQL390def setup_mysql_test(request, bigip, partition, name):391 def teardown():392 delete_resource(hc1)393 request.addfinalizer(teardown)394 hc1 = bigip.ltm.monitor.mysqls395 mysql1 = hc1.mysql396 creation_params = {'name': name,397 'partition': partition}398 mysql1.create(**creation_params)399 return mysql1, hc1400class TestMonitorMYSQL(object):401 def test_mysql_create_refresh_update_delete_load(self, request, bigip):402 mysql1, hc1 = setup_mysql_test(request, bigip, 'Common', 'mysqltest')403 assert mysql1.name == 'mysqltest'404 mysql1.description = TESTDESCRIPTION405 mysql1.update()406 assert mysql1.description == TESTDESCRIPTION407 mysql1.description = ''408 mysql1.refresh()409 assert mysql1.description == TESTDESCRIPTION410 mysql2 = hc1.mysql411 mysql2.load(partition='Common', name='mysqltest')412 assert mysql2.selfLink == mysql1.selfLink413# End MYSQL414# Begin NNTP415def setup_nntp_test(request, bigip, partition, name):416 def teardown():417 delete_resource(hc1)418 request.addfinalizer(teardown)419 hc1 = bigip.ltm.monitor.nntps420 nntp1 = hc1.nntp421 creation_params = {'name': name,422 'partition': partition}423 nntp1.create(**creation_params)424 return nntp1, hc1425class TestMonitorNNTP(object):426 def test_nntp_create_refresh_update_delete_load(self, request, bigip):427 nntp1, hc1 = setup_nntp_test(request, bigip, 'Common', 'nntptest')428 assert nntp1.name == 'nntptest'429 nntp1.description = TESTDESCRIPTION430 nntp1.update()431 assert nntp1.description == TESTDESCRIPTION432 nntp1.description = ''433 nntp1.refresh()434 assert nntp1.description == TESTDESCRIPTION435 nntp2 = hc1.nntp436 nntp2.load(partition='Common', name='nntptest')437 assert nntp2.selfLink == nntp1.selfLink438# End NNTP439# Begin NONE440'''441def setup_none_test(request, bigip, partition, name):442 def teardown():443 delete_resource(hc1)444 request.addfinalizer(teardown)445 hc1 = bigip.ltm.monitor.nones446 none1 = hc1.none447 creation_params = {'name': name,448 'partition': partition}449 none1.create(**creation_params)450 return none1, hc1451class TestMonitorNONE(object):452 def test_none_create_refresh_update_delete_load(self, request,453 bigip):454 none1, hc1 = setup_none_test(request, bigip, 'Common', 'nonetest')455 assert none1.name == 'nonetest'456 none1.description = TESTDESCRIPTION457 none1.update()458 assert none1.description == TESTDESCRIPTION459 none1.description = ''460 none1.refresh()461 assert none1.description == TESTDESCRIPTION462 none2 = hc1.none463 none2.load(partition='Common', name='nonetest')464 assert none2.selfLink == none1.selfLink465'''466# End NONE467# Begin Oracle468def setup_oracle_test(request, bigip, partition, name):469 def teardown():470 delete_resource(hc1)471 request.addfinalizer(teardown)472 hc1 = bigip.ltm.monitor.oracles473 oracle1 = hc1.oracle474 creation_params = {'name': name,475 'partition': partition}476 oracle1.create(**creation_params)477 return oracle1, hc1478class TestMonitorOracle(object):479 def test_oracle_create_refresh_update_delete_load(self, request, bigip):480 oracle1, hc1 =\481 setup_oracle_test(request, bigip, 'Common', 'oracletest')482 assert oracle1.name == 'oracletest'483 oracle1.description = TESTDESCRIPTION484 oracle1.update()485 assert oracle1.description == TESTDESCRIPTION486 oracle1.description = ''487 oracle1.refresh()488 assert oracle1.description == TESTDESCRIPTION489 oracle2 = hc1.oracle490 oracle2.load(partition='Common', name='oracletest')491 assert oracle2.selfLink == oracle1.selfLink492# End Oracle493# Begin POP3494def setup_pop3_test(request, bigip, partition, name):495 def teardown():496 delete_resource(hc1)497 request.addfinalizer(teardown)498 hc1 = bigip.ltm.monitor.pop3s499 pop31 = hc1.pop3500 creation_params = {'name': name,501 'partition': partition}502 pop31.create(**creation_params)503 return pop31, hc1504class TestMonitorPOP3(object):505 def test_pop3_create_refresh_update_delete_load(self, request, bigip):506 pop31, hc1 = setup_pop3_test(request, bigip, 'Common', 'pop3test')507 assert pop31.name == 'pop3test'508 pop31.description = TESTDESCRIPTION509 pop31.update()510 assert pop31.description == TESTDESCRIPTION511 pop31.description = ''512 pop31.refresh()513 assert pop31.description == TESTDESCRIPTION514 pop32 = hc1.pop3515 pop32.load(partition='Common', name='pop3test')516 assert pop32.selfLink == pop31.selfLink517# End POP3518# Begin PostGRESQL519def setup_postgresql_test(request, bigip, partition, name):520 def teardown():521 delete_resource(hc1)522 request.addfinalizer(teardown)523 hc1 = bigip.ltm.monitor.postgresqls524 postgresql1 = hc1.postgresql525 creation_params = {'name': name,526 'partition': partition}527 postgresql1.create(**creation_params)528 return postgresql1, hc1529class TestMonitorPostGRESQL(object):530 def test_postgresql_create_refresh_update_delete_load(self, request,531 bigip):532 postgresql1, hc1 =\533 setup_postgresql_test(request, bigip, 'Common', 'postgresqltest')534 assert postgresql1.name == 'postgresqltest'535 postgresql1.description = TESTDESCRIPTION536 postgresql1.update()537 assert postgresql1.description == TESTDESCRIPTION538 postgresql1.description = ''539 postgresql1.refresh()540 assert postgresql1.description == TESTDESCRIPTION541 postgresql2 = hc1.postgresql542 postgresql2.load(partition='Common', name='postgresqltest')543 assert postgresql2.selfLink == postgresql1.selfLink544# End PostGRESQL545# Begin Radius546def setup_radius_test(request, bigip, partition, name):547 def teardown():548 delete_resource(hc1)549 request.addfinalizer(teardown)550 hc1 = bigip.ltm.monitor.radius_s551 radius1 = hc1.radius552 creation_params = {'name': name,553 'partition': partition}554 radius1.create(**creation_params)555 return radius1, hc1556class TestMonitorRadius(object):557 def test_radius_create_refresh_update_delete_load(self, request, bigip):558 radius1, hc1 =\559 setup_radius_test(request, bigip, 'Common', 'radiustest')560 assert radius1.name == 'radiustest'561 radius1.description = TESTDESCRIPTION562 radius1.update()563 assert radius1.description == TESTDESCRIPTION564 radius1.description = ''565 radius1.refresh()566 assert radius1.description == TESTDESCRIPTION567 radius2 = hc1.radius568 radius2.load(partition='Common', name='radiustest')569 assert radius2.selfLink == radius1.selfLink570# End Radius571# Begin Radius_Accounting572def setup_radius_accounting_test(request, bigip, partition, name):573 def teardown():574 delete_resource(hc1)575 request.addfinalizer(teardown)576 hc1 = bigip.ltm.monitor.radius_accountings577 radius_accounting1 = hc1.radius_accounting578 creation_params = {'name': name,579 'partition': partition}580 radius_accounting1.create(**creation_params)581 return radius_accounting1, hc1582class TestMonitorRadius_Accounting(object):583 def test_radius_accounting_create_refresh_update_delete_load(self, request,584 bigip):585 radius_accounting1, hc1 =\586 setup_radius_accounting_test(request,587 bigip,588 'Common',589 'radius_accountingtest')590 assert radius_accounting1.name == 'radius_accountingtest'591 radius_accounting1.description = TESTDESCRIPTION592 radius_accounting1.update()593 assert radius_accounting1.description == TESTDESCRIPTION594 radius_accounting1.description = ''595 radius_accounting1.refresh()596 assert radius_accounting1.description == TESTDESCRIPTION597 radius_accounting2 = hc1.radius_accounting598 radius_accounting2.load(partition='Common',599 name='radius_accountingtest')600 assert radius_accounting2.selfLink == radius_accounting1.selfLink601# End Radius_Accounting602# Begin Real_Server603def setup_real_server_test(request, bigip, partition, name):604 def teardown():605 delete_resource(hc1)606 request.addfinalizer(teardown)607 hc1 = bigip.ltm.monitor.real_servers608 real_server1 = hc1.real_server609 creation_params = {'name': name,610 'partition': partition}611 real_server1.create(**creation_params)612 return real_server1, hc1613class TestMonitorReal_Server(object):614 def test_real_server_create_refresh_update_delete_load(self, request,615 bigip):616 real_server1, hc1 =\617 setup_real_server_test(request, bigip, 'Common', 'real_servertest')618 assert real_server1.name == 'real_servertest'619 real_server1.description = TESTDESCRIPTION620 real_server1.update()621 assert real_server1.description == TESTDESCRIPTION622 real_server1.description = ''623 real_server1.refresh()624 assert real_server1.description == TESTDESCRIPTION625 real_server2 = hc1.real_server626 real_server2.load(partition='Common', name='real_servertest')627 assert real_server2.selfLink == real_server1.selfLink628# End Real_Server629# Begin RPC630def setup_rpc_test(request, bigip, partition, name):631 def teardown():632 delete_resource(hc1)633 request.addfinalizer(teardown)634 hc1 = bigip.ltm.monitor.rpcs635 rpc1 = hc1.rpc636 creation_params = {'name': name,637 'partition': partition}638 rpc1.create(**creation_params)639 return rpc1, hc1640class TestMonitorRPC(object):641 def test_rpc_create_refresh_update_delete_load(self, request, bigip):642 rpc1, hc1 = setup_rpc_test(request, bigip, 'Common', 'rpctest')643 assert rpc1.name == 'rpctest'644 rpc1.description = TESTDESCRIPTION645 rpc1.update()646 assert rpc1.description == TESTDESCRIPTION647 rpc1.description = ''648 rpc1.refresh()649 assert rpc1.description == TESTDESCRIPTION650 rpc2 = hc1.rpc651 rpc2.load(partition='Common', name='rpctest')652 assert rpc2.selfLink == rpc1.selfLink653# End RPC654# Begin SASP655def setup_sasp_test(request, bigip, partition, name, primaryAddress='1.1.1.1'):656 def teardown():657 delete_resource(hc1)658 request.addfinalizer(teardown)659 hc1 = bigip.ltm.monitor.sasps660 sasp1 = hc1.sasp661 creation_params = {'name': name,662 'partition': partition,663 'primaryAddress': primaryAddress}664 sasp1.create(**creation_params)665 return sasp1, hc1666class TestMonitorSASP(object):667 def test_sasp_create_refresh_update_delete_load(self, request, bigip):668 sasp1, hc1 = setup_sasp_test(request, bigip, 'Common', 'sasptest')669 assert sasp1.name == 'sasptest'670 sasp1.description = TESTDESCRIPTION671 pp(sasp1.__dict__)672 sasp1.update()673 assert sasp1.description == TESTDESCRIPTION674 sasp1.description = ''675 sasp1.refresh()676 assert sasp1.description == TESTDESCRIPTION677 sasp2 = hc1.sasp678 sasp2.load(partition='Common', name='sasptest')679 assert sasp2.selfLink == sasp1.selfLink680# End SASP681# Begin Scripted682def setup_scripted_test(request, bigip, partition, name):683 def teardown():684 delete_resource(hc1)685 request.addfinalizer(teardown)686 hc1 = bigip.ltm.monitor.scripteds687 scripted1 = hc1.scripted688 creation_params = {'name': name,689 'partition': partition}690 scripted1.create(**creation_params)691 return scripted1, hc1692class TestMonitorScripted(object):693 def test_scripted_create_refresh_update_delete_load(self, request, bigip):694 scripted1, hc1 =\695 setup_scripted_test(request, bigip, 'Common', 'scriptedtest')696 assert scripted1.name == 'scriptedtest'697 scripted1.description = TESTDESCRIPTION698 scripted1.update()699 assert scripted1.description == TESTDESCRIPTION700 scripted1.description = ''701 scripted1.refresh()702 assert scripted1.description == TESTDESCRIPTION703 scripted2 = hc1.scripted704 scripted2.load(partition='Common', name='scriptedtest')705 assert scripted2.selfLink == scripted1.selfLink706# End Scripted707# Begin SIP708def setup_sip_test(request, bigip, partition, name):709 def teardown():710 delete_resource(hc1)711 request.addfinalizer(teardown)712 hc1 = bigip.ltm.monitor.sips713 sip1 = hc1.sip714 creation_params = {'name': name,715 'partition': partition}716 sip1.create(**creation_params)717 return sip1, hc1718class TestMonitorSIP(object):719 def test_sip_create_refresh_update_delete_load(self, request, bigip):720 sip1, hc1 = setup_sip_test(request, bigip, 'Common', 'siptest')721 assert sip1.name == 'siptest'722 sip1.description = TESTDESCRIPTION723 sip1.update()724 assert sip1.description == TESTDESCRIPTION725 sip1.description = ''726 sip1.refresh()727 assert sip1.description == TESTDESCRIPTION728 sip2 = hc1.sip729 sip2.load(partition='Common', name='siptest')730 assert sip2.selfLink == sip1.selfLink731# End SIP732# Begin SMB733def setup_smb_test(request, bigip, partition, name):734 def teardown():735 delete_resource(hc1)736 request.addfinalizer(teardown)737 hc1 = bigip.ltm.monitor.smbs738 smb1 = hc1.smb739 creation_params = {'name': name,740 'partition': partition}741 smb1.create(**creation_params)742 return smb1, hc1743class TestMonitorSMB(object):744 def test_smb_create_refresh_update_delete_load(self, request, bigip):745 smb1, hc1 = setup_smb_test(request, bigip, 'Common', 'smbtest')746 assert smb1.name == 'smbtest'747 smb1.description = TESTDESCRIPTION748 smb1.update()749 assert smb1.description == TESTDESCRIPTION750 smb1.description = ''751 smb1.refresh()752 assert smb1.description == TESTDESCRIPTION753 smb2 = hc1.smb754 smb2.load(partition='Common', name='smbtest')755 assert smb2.selfLink == smb1.selfLink756# End SMB757# Begin SMTP758def setup_smtp_test(request, bigip, partition, name):759 def teardown():760 delete_resource(hc1)761 request.addfinalizer(teardown)762 hc1 = bigip.ltm.monitor.smtps763 smtp1 = hc1.smtp764 creation_params = {'name': name,765 'partition': partition}766 smtp1.create(**creation_params)767 return smtp1, hc1768class TestMonitorSMTP(object):769 def test_smtp_create_refresh_update_delete_load(self, request, bigip):770 smtp1, hc1 = setup_smtp_test(request, bigip, 'Common', 'smtptest')771 assert smtp1.name == 'smtptest'772 smtp1.description = TESTDESCRIPTION773 smtp1.update()774 assert smtp1.description == TESTDESCRIPTION775 smtp1.description = ''776 smtp1.refresh()777 assert smtp1.description == TESTDESCRIPTION778 smtp2 = hc1.smtp779 smtp2.load(partition='Common', name='smtptest')780 assert smtp2.selfLink == smtp1.selfLink781# End SMTP782# Begin SNMP_DCA783def setup_snmp_dca_test(request, bigip, partition, name):784 def teardown():785 delete_resource(hc1)786 request.addfinalizer(teardown)787 hc1 = bigip.ltm.monitor.snmp_dcas788 snmp_dca1 = hc1.snmp_dca789 creation_params = {'name': name,790 'partition': partition}791 snmp_dca1.create(**creation_params)792 return snmp_dca1, hc1793class TestMonitorSNMP_DCA(object):794 def test_snmp_dca_create_refresh_update_delete_load(self, request, bigip):795 snmp_dca1, hc1 =\796 setup_snmp_dca_test(request, bigip, 'Common', 'snmp_dcatest')797 assert snmp_dca1.name == 'snmp_dcatest'798 snmp_dca1.description = TESTDESCRIPTION799 snmp_dca1.update()800 assert snmp_dca1.description == TESTDESCRIPTION801 snmp_dca1.description = ''802 snmp_dca1.refresh()803 assert snmp_dca1.description == TESTDESCRIPTION804 snmp_dca2 = hc1.snmp_dca805 snmp_dca2.load(partition='Common', name='snmp_dcatest')806 assert snmp_dca2.selfLink == snmp_dca1.selfLink807# End SNMP_DCA808# Begin SNMP_DCA_Base809def setup_snmp_dca_base_test(request, bigip, partition, name):810 def teardown():811 delete_resource(hc1)812 request.addfinalizer(teardown)813 hc1 = bigip.ltm.monitor.snmp_dca_bases814 snmp_dca_base1 = hc1.snmp_dca_base815 creation_params = {'name': name,816 'partition': partition}817 snmp_dca_base1.create(**creation_params)818 return snmp_dca_base1, hc1819class TestMonitorSNMP_DCA_Base(object):820 def test_snmp_dca_base_create_refresh_update_delete_load(self, request,821 bigip):822 snmp_dca_base1, hc1 =\823 setup_snmp_dca_base_test(request,824 bigip,825 'Common',826 'snmp_dca_basetest')827 assert snmp_dca_base1.name == 'snmp_dca_basetest'828 snmp_dca_base1.description = TESTDESCRIPTION829 snmp_dca_base1.update()830 assert snmp_dca_base1.description == TESTDESCRIPTION831 snmp_dca_base1.description = ''832 snmp_dca_base1.refresh()833 assert snmp_dca_base1.description == TESTDESCRIPTION834 snmp_dca_base2 = hc1.snmp_dca_base835 snmp_dca_base2.load(partition='Common', name='snmp_dca_basetest')836 assert snmp_dca_base2.selfLink == snmp_dca_base1.selfLink837# End SNMP_DCA_Base838# Begin SOAP839def setup_soap_test(request, bigip, partition, name):840 def teardown():841 delete_resource(hc1)842 request.addfinalizer(teardown)843 hc1 = bigip.ltm.monitor.soaps844 soap1 = hc1.soap845 creation_params = {'name': name,846 'partition': partition}847 soap1.create(**creation_params)848 return soap1, hc1849class TestMonitorSOAP(object):850 def test_soap_create_refresh_update_delete_load(self, request, bigip):851 soap1, hc1 = setup_soap_test(request, bigip, 'Common', 'soaptest')852 assert soap1.name == 'soaptest'853 soap1.description = TESTDESCRIPTION854 soap1.update()855 assert soap1.description == TESTDESCRIPTION856 soap1.description = ''857 soap1.refresh()858 assert soap1.description == TESTDESCRIPTION859 soap2 = hc1.soap860 soap2.load(partition='Common', name='soaptest')861 assert soap2.selfLink == soap1.selfLink862# End SOAP863# Begin TCP864def setup_tcp_test(request, bigip, partition, name):865 def teardown():866 delete_resource(hc1)867 request.addfinalizer(teardown)868 hc1 = bigip.ltm.monitor.tcps869 tcp1 = hc1.tcp870 creation_params = {'name': name,871 'partition': partition}872 tcp1.create(**creation_params)873 return tcp1, hc1874class TestMonitorTCP(object):875 def test_tcp_create_refresh_update_delete_load(self, request, bigip):876 tcp1, hc1 = setup_tcp_test(request, bigip, 'Common', 'tcptest')877 assert tcp1.name == 'tcptest'878 tcp1.description = TESTDESCRIPTION879 tcp1.update()880 assert tcp1.description == TESTDESCRIPTION881 tcp1.description = ''882 tcp1.refresh()883 assert tcp1.description == TESTDESCRIPTION884 tcp2 = hc1.tcp885 tcp2.load(partition='Common', name='tcptest')886 assert tcp2.selfLink == tcp1.selfLink887# End TCP888# Begin TCP_Echo889def setup_tcp_echo_test(request, bigip, partition, name):890 def teardown():891 delete_resource(hc1)892 request.addfinalizer(teardown)893 hc1 = bigip.ltm.monitor.tcp_echos894 tcp_echo1 = hc1.tcp_echo895 creation_params = {'name': name,896 'partition': partition}897 tcp_echo1.create(**creation_params)898 return tcp_echo1, hc1899class TestMonitorTCP_Echo(object):900 def test_tcp_echo_create_refresh_update_delete_load(self, request, bigip):901 tcp_echo1, hc1 =\902 setup_tcp_echo_test(request, bigip, 'Common', 'tcp_echotest')903 assert tcp_echo1.name == 'tcp_echotest'904 tcp_echo1.description = TESTDESCRIPTION905 tcp_echo1.update()906 assert tcp_echo1.description == TESTDESCRIPTION907 tcp_echo1.description = ''908 tcp_echo1.refresh()909 assert tcp_echo1.description == TESTDESCRIPTION910 tcp_echo2 = hc1.tcp_echo911 tcp_echo2.load(partition='Common', name='tcp_echotest')912 assert tcp_echo2.selfLink == tcp_echo1.selfLink913# End TCP_Echo914# Begin TCP_Half_Open915def setup_tcp_half_open_test(request, bigip, partition, name):916 def teardown():917 delete_resource(hc1)918 request.addfinalizer(teardown)919 hc1 = bigip.ltm.monitor.tcp_half_opens920 tcp_half_open1 = hc1.tcp_half_open921 creation_params = {'name': name,922 'partition': partition}923 tcp_half_open1.create(**creation_params)924 return tcp_half_open1, hc1925class TestMonitorTCP_Half_Open(object):926 def test_tcp_half_open_create_refresh_update_delete_load(self, request,927 bigip):928 tcp_half_open1, hc1 =\929 setup_tcp_half_open_test(request,930 bigip,931 'Common',932 'tcp_half_opentest')933 assert tcp_half_open1.name == 'tcp_half_opentest'934 tcp_half_open1.description = TESTDESCRIPTION935 tcp_half_open1.update()936 assert tcp_half_open1.description == TESTDESCRIPTION937 tcp_half_open1.description = ''938 tcp_half_open1.refresh()939 assert tcp_half_open1.description == TESTDESCRIPTION940 tcp_half_open2 = hc1.tcp_half_open941 tcp_half_open2.load(partition='Common', name='tcp_half_opentest')942 assert tcp_half_open2.selfLink == tcp_half_open1.selfLink943# End TCP_Half_Open944# Begin UDP945def setup_udp_test(request, bigip, partition, name):946 def teardown():947 delete_resource(hc1)948 request.addfinalizer(teardown)949 hc1 = bigip.ltm.monitor.udps950 udp1 = hc1.udp951 creation_params = {'name': name,952 'partition': partition}953 udp1.create(**creation_params)954 return udp1, hc1955class TestMonitorUDP(object):956 def test_udp_create_refresh_update_delete_load(self, request, bigip):957 udp1, hc1 = setup_udp_test(request, bigip, 'Common', 'udptest')958 assert udp1.name == 'udptest'959 udp1.description = TESTDESCRIPTION960 udp1.update()961 assert udp1.description == TESTDESCRIPTION962 udp1.description = ''963 udp1.refresh()964 assert udp1.description == TESTDESCRIPTION965 udp2 = hc1.udp966 udp2.load(partition='Common', name='udptest')967 assert udp2.selfLink == udp1.selfLink968# End UDP969# Begin Virtual_Location970def setup_virtual_location_test(request, bigip, partition, name, pool='tp1'):971 def teardown():972 delete_resource(hc1)973 request.addfinalizer(teardown)974 hc1 = bigip.ltm.monitor.virtual_locations975 virtual_location1 = hc1.virtual_location976 creation_params = {'name': name,977 'partition': partition,978 'pool': pool}979 virtual_location1.create(**creation_params)980 return virtual_location1, hc1981class TestMonitorVirtual_Location(object):982 def test_virtual_location_create_refresh_update_delete_load(self, request,983 bigip):984 virtual_location1, hc1 =\985 setup_virtual_location_test(request,986 bigip,987 'Common',988 'virtual_locationtest')989 assert virtual_location1.name == 'virtual_locationtest'990 virtual_location1.description = TESTDESCRIPTION991 virtual_location1.update()992 assert virtual_location1.description == TESTDESCRIPTION993 virtual_location1.description = ''994 virtual_location1.refresh()995 assert virtual_location1.description == TESTDESCRIPTION996 virtual_location2 = hc1.virtual_location997 virtual_location2.load(partition='Common', name='virtual_locationtest')998 assert virtual_location2.selfLink == virtual_location1.selfLink999# End Virtual_Location1000# Begin WAP1001def setup_wap_test(request, bigip, partition, name):1002 def teardown():1003 delete_resource(hc1)1004 request.addfinalizer(teardown)1005 hc1 = bigip.ltm.monitor.waps1006 wap1 = hc1.wap1007 creation_params = {'name': name,1008 'partition': partition}1009 wap1.create(**creation_params)1010 return wap1, hc11011class TestMonitorWAP(object):1012 def test_wap_create_refresh_update_delete_load(self, request, bigip):1013 wap1, hc1 = setup_wap_test(request, bigip, 'Common', 'waptest')1014 assert wap1.name == 'waptest'1015 wap1.description = TESTDESCRIPTION1016 wap1.update()1017 assert wap1.description == TESTDESCRIPTION1018 wap1.description = ''1019 wap1.refresh()1020 assert wap1.description == TESTDESCRIPTION1021 wap2 = hc1.wap1022 wap2.load(partition='Common', name='waptest')1023 assert wap2.selfLink == wap1.selfLink1024# End WAP1025# Begin WMI1026def setup_wmi_test(request, bigip, partition, name):1027 def teardown():1028 delete_resource(hc1)1029 request.addfinalizer(teardown)1030 hc1 = bigip.ltm.monitor.wmis1031 wmi1 = hc1.wmi1032 creation_params = {'name': name,1033 'partition': partition}1034 wmi1.create(**creation_params)1035 return wmi1, hc11036class TestMonitorWMI(object):1037 def test_wmi_create_refresh_update_delete_load(self, request, bigip):1038 wmi1, hc1 = setup_wmi_test(request, bigip, 'Common', 'wmitest')1039 assert wmi1.name == 'wmitest'1040 wmi1.description = TESTDESCRIPTION1041 wmi1.update()1042 assert wmi1.description == TESTDESCRIPTION...
test_scenario.py
Source:test_scenario.py
1# Licensed under the Apache License, Version 2.0 (the "License"); you may2# not use this file except in compliance with the License. You may obtain3# a copy of the License at4#5# http://www.apache.org/licenses/LICENSE-2.06#7# Unless required by applicable law or agreed to in writing, software8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the10# License for the specific language governing permissions and limitations11# under the License.12import testtools13from tempest import config14from tempest.lib import decorators15from sahara_tempest_plugin.tests.cli import clusters16from sahara_tempest_plugin.tests.cli import cluster_templates17from sahara_tempest_plugin.tests.cli import images18from sahara_tempest_plugin.tests.cli import node_group_templates19from sahara_tempest_plugin.tests.cli import plugins20from sahara_tempest_plugin.tests.cli import job_binaries21from sahara_tempest_plugin.tests.cli import jobs22from sahara_tempest_plugin.tests.cli import job_templates23from sahara_tempest_plugin.tests.cli import data_sources24from sahara_tempest_plugin.tests.cli import job_types25TEMPEST_CONF = config.CONF26NODE_GROUP_TEMPLATE = 'node group template'27class Scenario(images.SaharaImageCLITest,28 node_group_templates.SaharaNodeGroupCLITest,29 cluster_templates.SaharaClusterTemplateCLITest,30 clusters.SaharaClusterCLITest,31 plugins.SaharaPluginCLITest,32 job_binaries.SaharaJobBinaryCLITest,33 jobs.SaharaJobCLITest,34 job_templates.SaharaJobTemplateCLITest,35 data_sources.SaharaDataSourceCLITest,36 job_types.SaharaJobTypeCLITest):37 def test_plugin_cli(self):38 self.openstack_plugin_list()39 self.openstack_plugin_show()40 self.openstack_plugin_configs_get()41 if TEMPEST_CONF.data_processing.plugin_update_support:42 self.openstack_plugin_update()43 def test_node_group_cli(self):44 master_ngt = self.openstack_node_group_template_create('master', '4')45 worker_ngt = self.openstack_node_group_template_create('worker', '3')46 self.addCleanup(self.delete_resource, NODE_GROUP_TEMPLATE, master_ngt)47 self.addCleanup(self.delete_resource, NODE_GROUP_TEMPLATE, worker_ngt)48 self.filter_node_group_list_with_plugin()49 self.openstack_node_group_template_list()50 new_master_ngt = self.openstack_node_group_template_update(51 master_ngt, update_field='name')52 self.addCleanup(self.delete_resource, NODE_GROUP_TEMPLATE,53 new_master_ngt)54 self.openstack_node_group_template_show(new_master_ngt)55 self.openstack_node_group_template_delete(new_master_ngt)56 self.negative_try_to_delete_protected_node_group(worker_ngt)57 self.openstack_node_group_template_delete(worker_ngt)58 self.wait_for_resource_deletion(new_master_ngt, NODE_GROUP_TEMPLATE)59 self.wait_for_resource_deletion(worker_ngt, NODE_GROUP_TEMPLATE)60 self.negative_delete_removed_node_group(worker_ngt)61 def test_cluster_template_cli(self):62 cluster_template_cmd = 'cluster template'63 ng_master = (64 self.openstack_node_group_template_create('tmp-master', '4'))65 ng_worker = (66 self.openstack_node_group_template_create('tmp-worker', '3'))67 self.addCleanup(self.delete_resource, NODE_GROUP_TEMPLATE,68 ng_master)69 self.addCleanup(self.delete_resource, NODE_GROUP_TEMPLATE,70 ng_worker)71 cluster_template_name = (72 self.openstack_cluster_template_create(ng_master, ng_worker))73 self.addCleanup(self.delete_resource, cluster_template_cmd,74 cluster_template_name)75 self.openstack_cluster_template_list()76 self.openstack_cluster_template_show(cluster_template_name)77 new_cluster_template_name = self.openstack_cluster_template_update(78 cluster_template_name)79 self.addCleanup(self.delete_resource, cluster_template_cmd,80 new_cluster_template_name)81 self.openstack_cluster_template_delete(new_cluster_template_name)82 self.wait_for_resource_deletion(new_cluster_template_name,83 cluster_template_cmd)84 self.openstack_node_group_template_delete(ng_master)85 self.openstack_node_group_template_delete(ng_worker)86 self.wait_for_resource_deletion(ng_master, NODE_GROUP_TEMPLATE)87 self.wait_for_resource_deletion(ng_worker, NODE_GROUP_TEMPLATE)88 @decorators.skip_because(bug="1629295")89 def test_cluster_cli(self):90 image_name = self.openstack_image_register(91 TEMPEST_CONF.data_processing.test_image_name,92 TEMPEST_CONF.data_processing.test_ssh_user)93 self.openstack_image_tags_set(image_name)94 self.openstack_image_tags_remove(image_name)95 self.openstack_image_tags_add(image_name)96 self.openstack_image_show(image_name)97 self.openstack_image_list()98 flavors_client = self.client_manager_admin.flavors_client99 flavor_ref = flavors_client.create_flavor(name='sahara-flavor',100 ram=512, vcpus=1, disk=4,101 id=20)['flavor']102 self.addCleanup(flavors_client.delete_flavor, flavor_ref['id'])103 self.addCleanup(self.openstack_image_unregister, image_name)104 ng_master = self.openstack_node_group_template_create('cli-cluster'105 '-master',106 'sahara-flavor')107 ng_worker = self.openstack_node_group_template_create('cli-cluster'108 '-worker',109 'sahara-flavor')110 self.addCleanup(self.delete_resource, 'node group template',111 ng_master)112 self.addCleanup(self.delete_resource, 'node group template',113 ng_worker)114 cluster_template_name = (115 self.openstack_cluster_template_create(ng_master, ng_worker))116 self.addCleanup(self.delete_resource, 'cluster template',117 cluster_template_name)118 cluster_name = (119 self.openstack_cluster_create(cluster_template_name, image_name))120 self.addCleanup(self.delete_resource, 'cluster',121 cluster_name)122 self._run_job_on_cluster(cluster_name)123 self.openstack_cluster_list()124 self.openstack_cluster_show(cluster_name)125 self.openstack_cluster_update(cluster_name)126 self.openstack_cluster_verification_show(cluster_name)127 self.openstack_cluster_verification_start(cluster_name)128 self.openstack_cluster_scale(cluster_name, ng_worker)129 self.openstack_cluster_delete(cluster_name)130 self.wait_for_resource_deletion(cluster_name, 'cluster')131 self.openstack_cluster_template_delete(cluster_template_name)132 self.wait_for_resource_deletion(cluster_template_name, 'cluster '133 'template')134 self.openstack_node_group_template_delete(ng_master)135 self.openstack_node_group_template_delete(ng_worker)136 self.wait_for_resource_deletion(ng_master, 'node group template')137 self.wait_for_resource_deletion(ng_worker, 'node group template')138 self.openstack_image_unregister(image_name)139 self.negative_unregister_not_existing_image(image_name)140 @testtools.skipIf(TEMPEST_CONF.data_processing.api_version_saharaclient !=141 '1.1', "Full job binaries testing requires API v1.1")142 def test_job_binary_cli(self):143 job_binary_name, original_file = self.openstack_job_binary_create()144 self.addCleanup(self.delete_resource, 'job binary', job_binary_name)145 self.openstack_job_binary_list()146 self.openstack_job_binary_show(job_binary_name)147 self.openstack_job_binary_update(job_binary_name, flag='description')148 self.openstack_job_binary_download(job_binary_name, original_file)149 self.filter_job_binaries_in_list()150 self.negative_try_to_update_protected_jb(job_binary_name)151 self.openstack_job_binary_update(job_binary_name, flag='unprotected')152 self.openstack_job_binary_delete(job_binary_name)153 self.negative_delete_removed_job_binary(job_binary_name)154 def test_job_template_cli(self):155 job_binary_name, _ = self.openstack_job_binary_create(156 job_internal=False)157 self.addCleanup(self.delete_resource, 'job binary', job_binary_name)158 job_template_name = self.openstack_job_template_create(job_binary_name)159 self.addCleanup(self.delete_resource, 'job template',160 job_template_name)161 self.openstack_job_template_list()162 self.openstack_job_template_show(job_template_name)163 self.openstack_job_template_update(job_template_name)164 self.openstack_job_template_delete(job_template_name)165 self.openstack_job_binary_delete(job_binary_name)166 def test_data_source_cli(self):167 data_source_name = self.openstack_data_source_create()168 self.addCleanup(self.delete_resource, 'data source', data_source_name)169 self.openstack_data_source_list()170 self.openstack_data_source_show(data_source_name)171 self.openstack_data_source_update(data_source_name)172 self.openstack_data_source_delete(data_source_name)173 def test_job_type_cli(self):174 self.openstack_job_type_list()175 self.openstack_job_type_configs_get(flag='file')176 self.filter_job_type_in_list()177 def _run_job_on_cluster(self, cluster_name):178 job_template_name = self.openstack_job_template_name()179 self.addCleanup(self.delete_resource, 'job template',180 job_template_name)181 input_file = self.openstack_data_source_create()182 output_file = self.openstack_data_source_create()183 self.addCleanup(self.delete_resource, 'data source', input_file)184 self.addCleanup(self.delete_resource, 'data source', output_file)185 job_id = self.openstack_job_execute(cluster_name, job_template_name,186 input_file, output_file)187 self.addCleanup(self.delete_resource, 'job', job_id)188 self.openstack_job_list()189 self.openstack_job_show(job_id)190 self.openstack_job_update(job_id)191 self.openstack_job_delete(job_id)192 self.openstack_data_source_delete(input_file)193 self.openstack_data_source_delete(output_file)...
clients.py
Source:clients.py
...24from tempest_lib import exceptions as exc25class Client(object):26 def is_resource_deleted(self, method, *args, **kwargs):27 raise NotImplementedError28 def delete_resource(self, method, *args, **kwargs):29 # TODO(sreshetniak): make timeout configurable30 with fixtures.Timeout(300, gentle=True):31 while True:32 if self.is_resource_deleted(method, *args, **kwargs):33 break34 time.sleep(5)35class SaharaClient(Client):36 def __init__(self, *args, **kwargs):37 self.sahara_client = sahara_client.Client('1.1', *args, **kwargs)38 def create_node_group_template(self, *args, **kwargs):39 data = self.sahara_client.node_group_templates.create(*args, **kwargs)40 return data.id41 def delete_node_group_template(self, node_group_template_id):42 return self.delete_resource(43 self.sahara_client.node_group_templates.delete,44 node_group_template_id)45 def create_cluster_template(self, *args, **kwargs):46 data = self.sahara_client.cluster_templates.create(*args, **kwargs)47 return data.id48 def delete_cluster_template(self, cluster_template_id):49 return self.delete_resource(50 self.sahara_client.cluster_templates.delete,51 cluster_template_id)52 def create_cluster(self, *args, **kwargs):53 data = self.sahara_client.clusters.create(*args, **kwargs)54 return data.id55 def delete_cluster(self, cluster_id):56 return self.delete_resource(57 self.sahara_client.clusters.delete,58 cluster_id)59 def scale_cluster(self, cluster_id, body):60 return self.sahara_client.clusters.scale(cluster_id, body)61 def create_datasource(self, *args, **kwargs):62 data = self.sahara_client.data_sources.create(*args, **kwargs)63 return data.id64 def delete_datasource(self, datasource_id):65 return self.delete_resource(66 self.sahara_client.data_sources.delete,67 datasource_id)68 def create_job_binary_internal(self, *args, **kwargs):69 data = self.sahara_client.job_binary_internals.create(*args, **kwargs)70 return data.id71 def delete_job_binary_internal(self, job_binary_internal_id):72 return self.delete_resource(73 self.sahara_client.job_binary_internals.delete,74 job_binary_internal_id)75 def create_job_binary(self, *args, **kwargs):76 data = self.sahara_client.job_binaries.create(*args, **kwargs)77 return data.id78 def delete_job_binary(self, job_binary_id):79 return self.delete_resource(80 self.sahara_client.job_binaries.delete,81 job_binary_id)82 def create_job(self, *args, **kwargs):83 data = self.sahara_client.jobs.create(*args, **kwargs)84 return data.id85 def delete_job(self, job_id):86 return self.delete_resource(87 self.sahara_client.jobs.delete,88 job_id)89 def run_job(self, *args, **kwargs):90 data = self.sahara_client.job_executions.create(*args, **kwargs)91 return data.id92 def delete_job_execution(self, job_execution_id):93 return self.delete_resource(94 self.sahara_client.job_executions.delete,95 job_execution_id)96 def get_cluster_status(self, cluster_id):97 data = self.sahara_client.clusters.get(cluster_id)98 return str(data.status)99 def get_job_status(self, exec_id):100 data = self.sahara_client.job_executions.get(exec_id)101 return str(data.info['status'])102 def is_resource_deleted(self, method, *args, **kwargs):103 try:104 method(*args, **kwargs)105 except saharaclient_base.APIException as ex:106 return ex.error_code == 404107 return False108class NovaClient(Client):109 def __init__(self, *args, **kwargs):110 self.nova_client = nova_client.Client('1.1', *args, **kwargs)111 def get_image_id(self, image_name):112 if uuidutils.is_uuid_like(image_name):113 return image_name114 for image in self.nova_client.images.list():115 if image.name == image_name:116 return image.id117 raise exc.NotFound(image_name)118class NeutronClient(Client):119 def __init__(self, *args, **kwargs):120 self.neutron_client = neutron_client.Client('2.0', *args, **kwargs)121 def get_network_id(self, network_name):122 if uuidutils.is_uuid_like(network_name):123 return network_name124 networks = self.neutron_client.list_networks(name=network_name)125 networks = networks['networks']126 if len(networks) < 1:127 raise exc.NotFound(network_name)128 return networks[0]['id']129class SwiftClient(Client):130 def __init__(self, *args, **kwargs):131 self.swift_client = swift_client.Connection(auth_version='2.0',132 *args, **kwargs)133 def create_container(self, container_name):134 return self.swift_client.put_container(container_name)135 def delete_container(self, container_name):136 return self.delete_resource(137 self.swift_client.delete_container,138 container_name)139 def upload_data(self, container_name, object_name, data):140 return self.swift_client.put_object(container_name, object_name, data)141 def delete_object(self, container_name, object_name):142 return self.delete_resource(143 self.swift_client.delete_object,144 container_name,145 object_name)146 def is_resource_deleted(self, method, *args, **kwargs):147 try:148 method(*args, **kwargs)149 except swift_exc.ClientException as ex:150 return ex.http_status == 404...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!