Best Python code snippet using avocado_python
test_power.py
Source:test_power.py
1# Copyright 2014, 2017 IBM Corp.2#3# All Rights Reserved.4#5# Licensed under the Apache License, Version 2.0 (the "License"); you may6# not use this file except in compliance with the License. You may obtain7# a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the14# License for the specific language governing permissions and limitations15# under the License.16import fixtures17import mock18import testtools19import pypowervm.exceptions as pexc20from pypowervm.tasks import power21import pypowervm.tasks.power_opts as popts22import pypowervm.tests.test_fixtures as fx23import pypowervm.wrappers.base_partition as pvm_bp24import pypowervm.wrappers.logical_partition as pvm_lpar25class TestPower(testtools.TestCase):26 def setUp(self):27 super(TestPower, self).setUp()28 self.adpt = self.useFixture(fx.AdapterFx()).adpt29 # Make it easier to validate job params: create_job_parameter returns a30 # simple 'name=value' string.31 mock_crt_jparm = self.useFixture(fixtures.MockPatch(32 'pypowervm.wrappers.job.Job.create_job_parameter')).mock33 mock_crt_jparm.side_effect = (34 lambda name, value, cdata=False: '%s=%s' % (name, value))35 # Patch Job.wrap to return a mocked Job wrapper36 mock_job = mock.Mock()37 self.useFixture(fixtures.MockPatch(38 'pypowervm.wrappers.job.Job.wrap')).mock.return_value = mock_job39 self.run_job = mock_job.run_job40 def validate_run(self, part, ex_suff="PowerOff", ex_parms=None,41 ex_timeout=1800, ex_synch=True, result='', nxt=None):42 """Return side effect method to validate Adapter.read and Job.run_job.43 :param part: (Mock) partition wrapper.44 :param ex_suff: Expected Job suffix - "PowerOn" or "PowerOff"45 :param ex_parms: Set of expected JobParameter 'name=value' strings.46 :param ex_timeout: Expected timeout (int, seconds).47 :param ex_synch: Expected value of the 'synchronous' flag.48 :param result: The desired result of the run_job call. May be None49 (the run_job call "succeeded") or an instance of an50 exception to be raised (either JobRequestTimedOut or51 JobRequestFailed).52 :param nxt: When chaining side effects, pass the method to be assigned53 to the run_job side effect after this side effect runs.54 Typically the return from another validate_run() call.55 :return: A method suitable for assigning to self.run_job.side_effect.56 """57 def run_job_seff(uuid, job_parms=None, timeout=None, synchronous=None):58 # We fetched the Job template with the correct bits of the59 # partition wrapper and the correct suffix60 self.adpt.read.assert_called_once_with(61 part.schema_type, part.uuid, suffix_type='do',62 suffix_parm=ex_suff)63 # Reset for subsequent runs64 self.adpt.reset_mock()65 self.assertEqual(part.uuid, uuid)66 # JobParameter order doesn't matter67 self.assertEqual(ex_parms or set(), set(job_parms))68 self.assertEqual(ex_timeout, timeout)69 self.assertEqual(ex_synch, synchronous)70 if nxt:71 self.run_job.side_effect = nxt72 if result:73 raise result74 return run_job_seff75 @staticmethod76 def etimeout():77 """Returns a JobRequestTimedOut exception."""78 return pexc.JobRequestTimedOut(operation_name='foo', seconds=1800)79 @staticmethod80 def efail(error='error'):81 """Returns a JobRequestFailed exception."""82 return pexc.JobRequestFailed(operation_name='foo', error=error)83 def mock_partition(self, env=pvm_bp.LPARType.AIXLINUX,84 rmc_state=pvm_bp.RMCState.ACTIVE, mgmt=False):85 """Returns a mocked partition with the specified properties."""86 return mock.Mock(adapter=self.adpt, env=env, rmc_state=rmc_state,87 is_mgmt_partition=mgmt)88 def test_pwrop_start(self):89 """Test PowerOp.start."""90 part = self.mock_partition()91 # Default params, success92 self.run_job.side_effect = self.validate_run(part, ex_suff="PowerOn")93 power.PowerOp.start(part)94 self.assertEqual(1, self.run_job.call_count)95 self.run_job.reset_mock()96 # Additional params, timeout97 self.run_job.side_effect = self.validate_run(98 part, ex_suff="PowerOn", ex_parms={'foo=bar', 'one=two'},99 result=self.etimeout())100 self.assertRaises(101 pexc.VMPowerOnTimeout, power.PowerOp.start, part,102 opts=popts.PowerOnOpts(legacy_add_parms={'foo': 'bar',103 'one': 'two'}))104 self.assertEqual(1, self.run_job.call_count)105 self.run_job.reset_mock()106 # Asynchronous, failure107 self.run_job.side_effect = self.validate_run(108 part, ex_suff="PowerOn", ex_synch=False, result=self.efail())109 self.assertRaises(pexc.VMPowerOnFailure, power.PowerOp.start, part,110 synchronous=False)111 self.assertEqual(1, self.run_job.call_count)112 self.run_job.reset_mock()113 # Specified timeout, already on114 self.run_job.side_effect = self.validate_run(115 part, ex_suff="PowerOn", ex_timeout=10,116 result=self.efail('HSCL3681'))117 power.PowerOp.start(part, timeout=10)118 self.assertEqual(1, self.run_job.call_count)119 self.run_job.reset_mock()120 def test_pwrop_stop(self):121 """Test PowerOp.stop."""122 # If RMC is down, VSP normal - make sure the 'immediate' flag goes away123 part = self.mock_partition(rmc_state=pvm_bp.RMCState.INACTIVE)124 self.run_job.side_effect = self.validate_run(125 part, ex_parms={'operation=shutdown'})126 power.PowerOp.stop(127 part, opts=popts.PowerOffOpts().immediate().soft_detect(part))128 self.assertEqual(1, self.run_job.call_count)129 self.run_job.reset_mock()130 # Default parameters - the method figures out whether to do OS shutdown131 part = self.mock_partition()132 self.run_job.side_effect = self.validate_run(133 part, ex_parms={'operation=osshutdown', 'immediate=true'})134 power.PowerOp.stop(135 part, opts=popts.PowerOffOpts().immediate().soft_detect(part))136 self.assertEqual(1, self.run_job.call_count)137 self.run_job.reset_mock()138 # Non-default optional params ignored, timeout139 self.run_job.side_effect = self.validate_run(140 part, ex_parms={'operation=osshutdown', 'immediate=true',141 'restart=true'},142 ex_timeout=100, ex_synch=False, result=self.etimeout())143 self.assertRaises(144 pexc.VMPowerOffTimeout, power.PowerOp.stop, part,145 opts=popts.PowerOffOpts(legacy_add_parms={146 'one': 1, 'foo': 'bar'}).os_immediate().restart(),147 timeout=100, synchronous=False)148 self.assertEqual(1, self.run_job.call_count)149 self.run_job.reset_mock()150 # VSP normal, fail151 self.run_job.side_effect = self.validate_run(152 part, ex_parms={'operation=shutdown'},153 result=self.efail())154 self.assertRaises(155 pexc.VMPowerOffFailure, power.PowerOp.stop, part,156 opts=popts.PowerOffOpts().vsp_normal())157 self.assertEqual(1, self.run_job.call_count)158 def test_pwrop_stop_no_rmc(self):159 """Test PowerOp.stop with bad RMC state."""160 part = self.mock_partition(rmc_state=pvm_bp.RMCState.INACTIVE)161 self.assertRaises(pexc.OSShutdownNoRMC, power.PowerOp.stop,162 part, opts=popts.PowerOffOpts().os_normal())163 self.run_job.assert_not_called()164 def test_pwron(self):165 """Test the power_on method."""166 lpar = self.mock_partition()167 self.run_job.side_effect = self.validate_run(lpar, "PowerOn")168 power.power_on(lpar, None)169 self.assertEqual(1, self.run_job.call_count)170 self.run_job.reset_mock()171 # Try optional parameters172 self.run_job.side_effect = self.validate_run(173 lpar, "PowerOn", ex_parms={174 'bootmode=sms', 'iIPLsource=a', 'remove_optical_name=testVopt',175 'remove_optical_time=30'}, ex_synch=False)176 power.power_on(177 lpar, None, add_parms={178 power.BootMode.KEY: power.BootMode.SMS,179 pvm_lpar.IPLSrc.KEY: pvm_lpar.IPLSrc.A,180 power.RemoveOptical.KEY_TIME: 30,181 power.RemoveOptical.KEY_NAME: 'testVopt'},182 synchronous=False)183 self.assertEqual(1, self.run_job.call_count)184 self.run_job.reset_mock()185 # Job timeout, IBMi, implicit remove_optical_time186 ibmi = self.mock_partition(env=pvm_bp.LPARType.OS400)187 self.run_job.side_effect = self.validate_run(188 ibmi, "PowerOn", ex_parms={'remove_optical_name=test',189 'remove_optical_time=0'},190 result=self.etimeout())191 self.assertRaises(pexc.VMPowerOnTimeout, power.power_on, ibmi, None,192 add_parms=power.RemoveOptical.bld_map(name="test"))193 self.assertEqual(1, self.run_job.call_count)194 self.run_job.reset_mock()195 # Job failure, VIOS partition, explicit remove_optical_time196 vios = self.mock_partition(env=pvm_bp.LPARType.VIOS)197 self.run_job.side_effect = self.validate_run(198 vios, "PowerOn", ex_parms={'remove_optical_name=test2',199 'remove_optical_time=25'},200 result=self.efail())201 self.assertRaises(202 pexc.VMPowerOnFailure, power.power_on, vios, None,203 add_parms=power.RemoveOptical.bld_map(name="test2", time=25))204 self.assertEqual(1, self.run_job.call_count)205 def test_pwron_already_on(self):206 """PowerOn when the system is already powered on."""207 part = self.mock_partition()208 for prefix in power._ALREADY_POWERED_ON_ERRS:209 self.run_job.side_effect = self.validate_run(210 part, ex_suff="PowerOn", result=self.efail(211 error="Something %s Something else" % prefix))212 power.power_on(part, None)213 self.assertEqual(1, self.run_job.call_count)214 self.run_job.reset_mock()215 def test_pwroff_force_immed(self):216 """Test power_off with force_immediate=Force.TRUE."""217 # PowerOff with force-immediate works the same regardless of partition218 # type, RMC state, or management partition status.219 for env in (pvm_bp.LPARType.OS400, pvm_bp.LPARType.AIXLINUX,220 pvm_bp.LPARType.VIOS):221 for rmc in (pvm_bp.RMCState.ACTIVE, pvm_bp.RMCState.BUSY,222 pvm_bp.RMCState.INACTIVE):223 for mgmt in (True, False):224 part = self.mock_partition(env=env, rmc_state=rmc,225 mgmt=mgmt)226 self.run_job.side_effect = self.validate_run(227 part, ex_parms={'operation=shutdown',228 'immediate=true'})229 power.power_off(part, None,230 force_immediate=power.Force.TRUE)231 self.assertEqual(1, self.run_job.call_count)232 self.run_job.reset_mock()233 # Restart, timeout, additional params ignored234 part = self.mock_partition()235 self.run_job.side_effect = self.validate_run(236 part, ex_parms={'operation=shutdown', 'immediate=true',237 'restart=true'},238 ex_timeout=10, result=self.etimeout())239 self.assertRaises(pexc.VMPowerOffTimeout, power.power_off, part, None,240 force_immediate=power.Force.TRUE, restart=True,241 timeout=10, add_parms=dict(one=1))242 self.assertEqual(1, self.run_job.call_count)243 self.run_job.reset_mock()244 # Failure245 self.run_job.side_effect = self.validate_run(246 part, ex_parms={'operation=shutdown', 'immediate=true'},247 result=self.efail())248 self.assertRaises(pexc.VMPowerOffFailure, power.power_off, part, None,249 force_immediate=power.Force.TRUE)250 self.assertEqual(1, self.run_job.call_count)251 def test_pwroff_soft_ibmi_norm(self):252 """Soft PowerOff flow, IBMi, normal (no immediate)."""253 part = self.mock_partition(env=pvm_bp.LPARType.OS400)254 # This works the same whether intervening Job exceptions are Timeout or255 # Failure.256 for exc in (self.etimeout(), self.efail()):257 self.run_job.side_effect = (258 # OS normal259 self.validate_run(260 part, ex_parms={'operation=osshutdown'}, ex_timeout=100,261 result=exc,262 # OS immediate (timeout is defaulted from this point)263 nxt=self.validate_run(264 part, ex_parms={'operation=osshutdown',265 'immediate=true'}, result=exc,266 # VSP normal267 nxt=self.validate_run(268 part, ex_parms={'operation=shutdown'}, result=exc,269 # VSP hard (default timeout)270 nxt=self.validate_run(271 part, ex_parms={272 'operation=shutdown', 'immediate=true'}))))273 )274 # Run it275 power.power_off(part, None, timeout=100)276 self.assertEqual(4, self.run_job.call_count)277 self.run_job.reset_mock()278 # If one of the interim calls succeeds, the operation succeeds.279 self.run_job.side_effect = (280 # OS normal281 self.validate_run(282 part, ex_parms={'operation=osshutdown'}, result=self.efail(),283 # OS immediate (timeout is defaulted from this point)284 nxt=self.validate_run(285 part, ex_parms={'operation=osshutdown', 'immediate=true'},286 result=self.etimeout(),287 # VSP normal - succeeds288 nxt=self.validate_run(289 part, ex_parms={'operation=shutdown'},290 # Not reached291 nxt=self.fail))))292 power.power_off(part, None)293 self.assertEqual(3, self.run_job.call_count)294 def test_pwroff_soft_standard_timeout(self):295 """Soft PowerOff flow, non-IBMi, with timeout."""296 # When OS shutdown times out, go straight to VSP hard.297 part = self.mock_partition()298 self.run_job.side_effect = (299 # OS normal. Non-IBMi always adds immediate.300 self.validate_run(301 part, ex_parms={'operation=osshutdown', 'immediate=true'},302 ex_timeout=100, result=self.etimeout(),303 # VSP hard304 nxt=self.validate_run(305 part, ex_parms={'operation=shutdown', 'immediate=true'}))306 )307 # Run it308 power.power_off(part, None, timeout=100)309 self.assertEqual(2, self.run_job.call_count)310 self.run_job.reset_mock()311 # Same if invoked with immediate. But since we're running again, add312 # restart and another param; make sure restart comes through but the313 # bogus one is ignored.314 self.run_job.side_effect = (315 # OS immediate (non-IBMi always adds immediate).316 self.validate_run(317 part, ex_parms={'operation=osshutdown', 'immediate=true',318 'restart=true'},319 ex_timeout=200, result=self.etimeout(),320 # VSP hard321 nxt=self.validate_run(322 part, ex_parms={'operation=shutdown', 'immediate=true',323 'restart=true'}))324 )325 # Run it326 power.power_off(part, None, timeout=200, restart=True,327 add_parms={'foo': 'bar'})328 self.assertEqual(2, self.run_job.call_count)329 def test_pwroff_soft_no_retry(self):330 """Soft PowerOff, no retry."""331 # When OS shutdown fails with NO_RETRY, fail (no soft flow)332 # IBMi333 part = self.mock_partition(env=pvm_bp.LPARType.OS400)334 self.run_job.side_effect = self.validate_run(335 part, ex_parms={'operation=osshutdown'}, result=self.efail())336 self.assertRaises(pexc.VMPowerOffFailure, power.power_off, part, None,337 force_immediate=power.Force.NO_RETRY)338 self.assertEqual(1, self.run_job.call_count)339 self.run_job.reset_mock()340 # non-IBMi341 part = self.mock_partition()342 self.run_job.side_effect = self.validate_run(343 part, ex_parms={'operation=osshutdown', 'immediate=true'},344 result=self.efail())345 self.assertRaises(pexc.VMPowerOffFailure, power.power_off, part, None,346 force_immediate=power.Force.NO_RETRY)347 self.assertEqual(1, self.run_job.call_count)348 def test_pwroff_soft_standard_fail(self):349 """Soft PowerOff flow, non-IBMi, with Job failure."""350 # When OS shutdown fails (non-timeout), we try VSP normal first.351 part = self.mock_partition()352 self.run_job.side_effect = (353 # OS immediate (non-IBMi always adds immediate).354 # Make sure restart percolates through, bogus params ignored.355 self.validate_run(356 part, ex_parms={'operation=osshutdown', 'immediate=true',357 'restart=true'},358 ex_timeout=300, result=self.efail(),359 # VSP normal, timeout reset to default360 nxt=self.validate_run(361 part, ex_parms={362 'operation=shutdown', 'restart=true'},363 result=self.efail(),364 # VSP hard365 nxt=self.validate_run(366 part, ex_parms={'operation=shutdown', 'immediate=true',367 'restart=true'})))368 )369 power.power_off(part, None, timeout=300, restart=True,370 add_parms={'foo': 'bar'})371 self.assertEqual(3, self.run_job.call_count)372 def test_pwroff_soft_standard_no_rmc_no_retry(self):373 """Non-IBMi soft PowerOff does VSP normal if RMC is down; no retry."""374 # Behavior is the same for INACTIVE or BUSY375 for rmc in (pvm_bp.RMCState.INACTIVE, pvm_bp.RMCState.BUSY):376 part = self.mock_partition(rmc_state=rmc)377 self.run_job.side_effect = self.validate_run(378 part, ex_parms={'operation=shutdown'}, result=self.efail())379 self.assertRaises(380 pexc.VMPowerOffFailure, power.power_off, part, None,381 force_immediate=power.Force.NO_RETRY)382 self.assertEqual(1, self.run_job.call_count)383 self.run_job.reset_mock()384 # Job timeout & failure do the same (except for final exception).385 self.run_job.side_effect = self.validate_run(386 part, ex_parms={'operation=shutdown'}, result=self.etimeout())387 self.assertRaises(388 pexc.VMPowerOffTimeout, power.power_off, part, None,389 force_immediate=power.Force.NO_RETRY)390 self.assertEqual(1, self.run_job.call_count)391 self.run_job.reset_mock()392 def test_pwroff_already_off(self):393 """PowerOff when the system is already powered off."""394 part = self.mock_partition()395 for prefix in power._ALREADY_POWERED_OFF_ERRS:396 self.run_job.side_effect = self.validate_run(397 part, ex_parms={'operation=osshutdown', 'immediate=true'},398 result=self.efail(error="Foo %s bar" % prefix))399 power.power_off(part, None)400 self.assertEqual(1, self.run_job.call_count)401 self.run_job.reset_mock()402 # If restart was specified, this is a failure. (Force, to KISS)403 self.run_job.side_effect = self.validate_run(404 part, ex_parms={'operation=shutdown', 'immediate=true',405 'restart=true'},406 result=self.efail(error="Foo %s bar" % prefix))407 self.assertRaises(pexc.VMPowerOffFailure, power.power_off, part,408 None, restart=True,409 force_immediate=power.Force.TRUE)410 self.assertEqual(1, self.run_job.call_count)411 self.run_job.reset_mock()412 def test_pwroff_new_opts(self):413 """Test power_off where add_parms is PowerOffOpts (not legacy)."""414 part = self.mock_partition()415 # VSP hard416 self.run_job.side_effect = self.validate_run(417 part, ex_parms={'operation=shutdown', 'immediate=true'})418 power.power_off(part, None, add_parms=popts.PowerOffOpts().vsp_hard())419 self.assertEqual(1, self.run_job.call_count)420 self.run_job.reset_mock()421 # VSP normal422 self.run_job.side_effect = self.validate_run(423 part, ex_parms={'operation=shutdown'})424 power.power_off(part, None,425 add_parms=popts.PowerOffOpts().vsp_normal())426 self.run_job.reset_mock()427 # OS immediate428 self.run_job.side_effect = self.validate_run(429 part, ex_parms={'operation=osshutdown', 'immediate=true'})430 power.power_off(part, None,431 add_parms=popts.PowerOffOpts().os_immediate())432 self.run_job.reset_mock()433 # OS normal434 self.run_job.side_effect = self.validate_run(435 part, ex_parms={'operation=osshutdown'})436 power.power_off(part, None, add_parms=popts.PowerOffOpts().os_normal())437 @mock.patch('pypowervm.tasks.power._power_off_progressive')438 def test_pwroff_progressive(self, mock_prog_internal):439 # The internal _power_off_progressive is exercised via the existing440 # tests for power_off. This test just ensures the public441 # power_off_progressive calls it appropriately.442 # Default kwargs443 power.power_off_progressive('part')444 mock_prog_internal.assert_called_once_with(445 'part', 1800, False, ibmi_immed=False)446 mock_prog_internal.reset_mock()447 # Non-default kwargs448 power.power_off_progressive('part', restart=True, ibmi_immed=True,449 timeout=10)450 mock_prog_internal.assert_called_once_with(...
test_run_job.py
Source:test_run_job.py
...39 {"cmd": env_to_stderr + ["BAR"]},40 {"cmd": env_to_stderr + ["HI"]},41 ]}42 with log_capture('build') as logger:43 ret_env = run_job.run_job(logger, build_store, job_spec, {"BAZ": "BAZ"}, '<no-artifact>',44 {"virtual:bash": "bash/ljnq7g35h6h4qtb456h5r35ku3dq25nl"},45 tempdir, cfg)46 assert 'HDIST_CONFIG' in ret_env47 del ret_env['HDIST_CONFIG']48 del ret_env['PWD']49 expected = {50 'ARTIFACT': '<no-artifact>',51 'BAR': 'bar',52 'BAZ': 'BAZ',53 'FOO': 'foo',54 'HDIST_IMPORT': '',55 'HDIST_IMPORT_PATHS': '',56 'HDIST_VIRTUALS': 'virtual:bash=bash/ljnq7g35h6h4qtb456h5r35ku3dq25nl',57 'LD_LIBRARY_PATH': LD_LIBRARY_PATH,58 'PATH': ''59 }60 eq_(expected, ret_env)61 lines = filter_out(logger.lines)62 eq_(["FOO='foo'", "BAR='foox'", "HI='hi'", "FOO='foo'", "BAR='bar'", 'HI=None'],63 lines)64@build_store_fixture()65def test_env_control(tempdir, sc, build_store, cfg):66 LD_LIBRARY_PATH = os.environ.get("LD_LIBRARY_PATH", "")67 job_spec = {68 "commands": [69 {"set": "LD_LIBRARY_PATH", "value": LD_LIBRARY_PATH},70 {"set": "FOO", "value": "foo"},71 {"set": "FOO", "value": "bar"},72 {"append_flag": "CFLAGS", "value": "-O3"},73 {"prepend_flag": "CFLAGS", "value": "-O2"},74 {"prepend_flag": "CFLAGS", "value": "-O1"},75 {"append_path": "PATH", "value": "/bar/bin"},76 {"prepend_path": "PATH", "value": "/foo/bin"},77 {"cmd": env_to_stderr + ["FOO"]},78 {"cmd": env_to_stderr + ["CFLAGS"]},79 {"cmd": env_to_stderr + ["PATH"]},80 ]}81 with log_capture('build') as logger:82 ret_env = run_job.run_job(logger, build_store, job_spec, {},83 '<no-artifact>', {}, tempdir, cfg)84 eq_(["FOO='bar'", "CFLAGS='-O1 -O2 -O3'", "PATH='/foo/bin:/bar/bin'"],85 filter_out(logger.lines))86@build_store_fixture()87def test_imports(tempdir, sc, build_store, cfg):88 # Make dependencies89 doc = {90 "name": "foosoft", "version": "na", "build": {"commands": []},91 }92 foo_id, foo_path = build_store.ensure_present(doc, cfg)93 doc = {94 "name": "barsoft", "version": "na", "build": {"commands": []},95 }96 bar_id, bar_path = build_store.ensure_present(doc, cfg)97 virtuals = {'virtual:bar' : bar_id}98 # Dependee99 LD_LIBRARY_PATH = os.environ.get("LD_LIBRARY_PATH", "")100 doc = {101 "import": [{"ref": "FOOSOFT", "id": foo_id}, {"ref": "BARSOFT", "id": "virtual:bar"}],102 "commands": [103 {"cmd": env_to_stderr + ["FOOSOFT_DIR"]},104 {"cmd": env_to_stderr + ["FOOSOFT_ID"]},105 {"cmd": env_to_stderr + ["BARSOFT_DIR"]},106 {"cmd": env_to_stderr + ["BARSOFT_ID"]},107 ]108 }109 with log_capture('build') as logger:110 ret_env = run_job.run_job(logger, build_store, doc, {},111 '<no-artifact>', virtuals, tempdir, cfg)112 eq_(["FOOSOFT_DIR=%r" % foo_path,113 "FOOSOFT_ID=%r" % foo_id,114 "BARSOFT_DIR=%r" % bar_path,115 "BARSOFT_ID=%r" % bar_id],116 filter_out(logger.lines))117@build_store_fixture()118def test_inputs(tempdir, sc, build_store, cfg):119 job_spec = {120 "commands": [121 {122 "env": {"LD_LIBRARY_PATH": os.environ.get("LD_LIBRARY_PATH", "")},123 "cmd": [sys.executable, "$in0", "$in1"],124 "inputs": [125 {"text": ["import sys",126 "import json",127 "with open(sys.argv[1]) as f:"128 " print json.load(f)['foo']"]},129 {"json": {"foo": "Hello1"}}130 ]131 },132 {133 "env": {"LD_LIBRARY_PATH": os.environ.get("LD_LIBRARY_PATH", "")},134 "cmd": [sys.executable, "$in0"],135 "inputs": [{"string": "import sys\nprint 'Hello2'"}]136 },137 ]138 }139 with log_capture('build') as logger:140 ret_env = run_job.run_job(logger, build_store, job_spec, {"BAZ": "BAZ"}, '<no-artifact>',141 {"virtual:bash": "bash/ljnq7g35h6h4qtb456h5r35ku3dq25nl"},142 tempdir, cfg)143 logger.assertLogged('INFO:Hello1')144 logger.assertLogged('INFO:Hello2')145@build_store_fixture()146def test_capture_stdout(tempdir, sc, build_store, cfg):147 job_spec = {148 "commands": [149 {"cmd": ["$echo", " a b \n\n\n "], "to_var": "HI"},150 {151 "env": {"LD_LIBRARY_PATH": os.environ.get("LD_LIBRARY_PATH", "")},152 "cmd": env_to_stderr + ["HI"]}153 ]}154 # Test both with _TEST_LOG_PROCESS_SIMPLE and without155 def doit():156 with log_capture() as logger:157 run_job.run_job(logger, build_store, job_spec, {"echo": "/bin/echo"},158 '<no-artifact>', {}, tempdir, cfg)159 eq_(["HI='a b'"], filter_out(logger.lines))160 doit()161 o = run_job._TEST_LOG_PROCESS_SIMPLE162 try:163 run_job._TEST_LOG_PROCESS_SIMPLE = True164 doit()165 finally:166 run_job._TEST_LOG_PROCESS_SIMPLE = o167@build_store_fixture()168def test_script_redirect(tempdir, sc, build_store, cfg):169 job_spec = {170 "commands": [171 {"set": "foo", "value": "foo"},172 {"cmd": ["$echo", "hi"], "append_to_file": "$foo"}173 ]}174 run_job.run_job(test_logger, build_store, job_spec,175 {"echo": "/bin/echo"}, '<no-artifact>', {}, tempdir, cfg)176 with file(pjoin(tempdir, 'foo')) as f:177 assert f.read() == 'hi\n'178@build_store_fixture()179def test_attach_log(tempdir, sc, build_store, cfg):180 if 'linux' not in sys.platform:181 raise SkipTest('Linux only')182 with file(pjoin(tempdir, 'hello'), 'w') as f:183 f.write('hello from pipe')184 job_spec = {185 "commands": [186 {"hit": ["logpipe", "mylog", "WARNING"], "to_var": "LOG"},187 {"cmd": ["/bin/dd", "if=hello", "of=$LOG"]},188 ]}189 with log_capture('build') as logger:190 run_job.run_job(logger, build_store, job_spec, {},191 '<no-artifact>', {}, tempdir, cfg)192 logger.assertLogged('^WARNING:mylog:hello from pipe$')193@build_store_fixture()194def test_error_exit(tempdir, sc, build_store, cfg):195 job_spec = {196 "commands": [197 {"cmd": [which("false")]},198 ]}199 with assert_raises(CalledProcessError):200 run_job.run_job(test_logger, build_store, job_spec, {}, '<no-artifact>', {}, tempdir, cfg)201@build_store_fixture()202def test_log_pipe_stress(tempdir, sc, build_store, cfg):203 if 'linux' not in sys.platform:204 raise SkipTest('Linux only')205 # Stress-test the log piping a bit, since the combination of Unix FIFO206 # pipes and poll() is a bit tricky to get right.207 # We want to launch many clients who each concurrently send many messages,208 # then check that they all get through to log_capture. We do this by209 # writing out two Python scripts and executing them...210 NJOBS = 5211 NMSGS = 300 # must divide 2212 with open(pjoin(tempdir, 'client.py'), 'w') as f:213 f.write(dedent('''\214 import os, sys215 msg = sys.argv[1] * (256 // 4) # less than PIPE_BUF, more than what we set BUFSIZE to216 for i in range(int(sys.argv[2]) // 2):217 with open(os.environ["LOG"], "a") as f:218 f.write("%s\\n" % msg)219 f.write("%s\\n" % msg)220 # hit stdout too221 sys.stdout.write("stdout:%s\\nstdout:%s\\n" % (sys.argv[1], sys.argv[1]))222 sys.stdout.flush()223 sys.stderr.write("stderr:%s\\nstderr:%s\\n" % (sys.argv[1], sys.argv[1]))224 sys.stderr.flush()225 '''))226 with open(pjoin(tempdir, 'launcher.py'), 'w') as f:227 f.write(dedent('''\228 import sys229 import subprocess230 procs = [subprocess.Popen([sys.executable, sys.argv[1], '%4d' % i, sys.argv[3]]) for i in range(int(sys.argv[2]))]231 for p in procs:232 if not p.wait() == 0:233 raise AssertionError("process failed: %d" % p.pid)234 '''))235 job_spec = {236 "commands": [237 {"hit": ["logpipe", "mylog", "WARNING"], "to_var": "LOG"},238 {"set": "LD_LIBRARY_PATH", "value": os.environ.get("LD_LIBRARY_PATH", "")},239 {"cmd": [sys.executable, pjoin(tempdir, 'launcher.py'),240 pjoin(tempdir, 'client.py'), str(NJOBS), str(NMSGS)]},241 ]}242 old = run_job.LOG_PIPE_BUFSIZE243 try:244 run_job.LOG_PIPE_BUFSIZE = 50245 with log_capture('build') as logger:246 run_job.run_job(logger, build_store, job_spec, {}, '<no-artifact>', {}, tempdir, cfg)247 finally:248 run_job.LOG_PIPE_BUFSIZE = old249 log_bins = [0] * NJOBS250 stdout_bins = [0] * NJOBS251 stderr_bins = [0] * NJOBS252 for line in logger.lines:253 parts = line.split(':')254 if len(parts) != 3:255 continue256 level, log, msg = parts257 if log == 'mylog':258 assert level == 'WARNING'259 assert msg == msg[:4] * (256 // 4)260 idx = int(msg[:4])261 log_bins[idx] += 1262 elif log == 'stdout':263 assert level == 'INFO'264 stdout_bins[int(msg)] += 1265 elif log == 'stderr':266 assert level == 'INFO'267 stderr_bins[int(msg)] += 1268 assert all(x == NMSGS for x in log_bins)269 assert all(x == NMSGS for x in stdout_bins)270 assert all(x == NMSGS for x in stderr_bins)271@build_store_fixture()272def test_notimplemented_redirection(tempdir, sc, build_store, cfg):273 job_spec = {274 "commands": [275 {"hit": ["logpipe", "mylog", "WARNING"], "to_var": "log"},276 {"cmd": ["/bin/echo", "my warning"], "append_to_file": "$log"}277 ]}278 with assert_raises(NotImplementedError):279 run_job.run_job(test_logger, build_store, job_spec, {}, '<no-artifact>', {}, tempdir, cfg)280@build_store_fixture()281def test_script_cwd(tempdir, sc, build_store, cfg):282 os.makedirs(pjoin(tempdir, 'a', 'b', 'c'))283 job_spec = {284 "commands": [285 {"chdir": "a"},286 {"commands": [287 {"chdir": "b"},288 {"commands": [289 {"chdir": "c"},290 {"commands": [291 {"chdir": ".."},292 {"cmd": ["/bin/pwd"], "append_to_file": "out"}293 ]}]}]}]}294 run_job.run_job(test_logger, build_store, job_spec, {}, '<no-artifact>', {}, tempdir, cfg)295 assert os.path.exists(pjoin(tempdir, 'a', 'b', 'out'))296 with open(pjoin(tempdir, 'a', 'b', 'out')) as f:297 assert f.read().strip() == pjoin(tempdir, 'a', 'b')298def test_substitute():299 env = {"A": "a", "B": "b"}300 def check(want, x):301 eq_(want, run_job.substitute(x, env))302 def check_raises(x):303 with assert_raises(KeyError):304 run_job.substitute(x, env)305 yield check, "ab", "$A$B"306 yield check, "ax", "${A}x"307 yield check, r"${A}x", "\\${A}x"308 yield check, r"\${A}x", "\\\\${A}x"...
run_job.py
Source:run_job.py
1#!/usr/bin/python2# Purpose: This Job is the starting point of all jobs. 3# Looks up jobnames.list and calls the script4import sys, os, commands, envvars, datetime, time ,getpass, errno5from optparse import OptionParser6from datetime import datetime7import subprocess8from subprocess import Popen, PIPE9def call_script(path, job, options):10 rc, out = commands.getstatusoutput(kerb)11 print("run_job.py -> Authenticated : "+kerb+" RC:"+str(rc))12 cmd = ' '.join(['python',13 path + '/' + job,14 options])15 print("run_job.py -> Invoked : " + cmd) 16 #rc,status = commands.getstatusoutput(cmd)17 #print status18 call = subprocess.Popen(cmd.split(' '),stdout=subprocess.PIPE,stderr=subprocess.STDOUT)19 prev_line = ""20 new_line = 021 while True:22 line = call.stdout.readline()23 if not line:24 break25 if line.startswith('**ABC_log**'):26 writeabc(line.split("->")[1])27 ref = stdout_file.tell()28 else:29 30 if prev_line != line.strip():31 if new_line !=0:32 print33 new_line=034 35 print line.strip()36 ref = stdout_file.tell()37 else:38 if new_line == 0:39 print40 stdout_file.seek(ref)41 print '\r'+'Above status doesnot change and last checked @'+str(datetime.fromtimestamp(time.time())),42 new_line = 143 prev_line = line.strip()44 call.communicate()45 return call.returncode46def writeabc(line):47 #global msck_count,msck_command, app, sub_app48 if (app.lower() == "bdh" and sub_app.lower() == "data_ingest") or (app.lower() == "apd" and sub_app.lower() == "rreap") :49 with open(abc_log_file, 'w') as myfile:50 myfile.write(line)51 chmod_abc = "chmod 777 "+abc_log_file52 rc, status = commands.getstatusoutput(chmod_abc)53 print "--- running abc_hdfs_put command --> " + abc_hdfs_put54 rc, status = commands.getstatusoutput(abc_hdfs_put)55 if (rc >0):56 print status57 else:58 print "run_job.py -> ABC_log written : " + line59 global msck_count60 if (msck_count == 0): 61 print "run_job.py -> Invoked : " + msck_command62 rc1,status1 = commands.getstatusoutput(msck_command)63 print status164 msck_count = 1 65def silentremove(filename):66 try:67 os.remove(filename)68 except OSError as e: # this would be "except OSError, e:" before Python 2.669 if e.errno != errno.ENOENT: # errno.ENOENT = no such file or directory70 raise # re-raise exception if a different error occured71def main():72 global return_code, msck_count, msck_command73 return_code = 074 msck_count = 075 home = '/data/'76 path = os.path.dirname(os.path.realpath(__file__))77 root = path.split('src/scripts')[0]78 79 #env = path.split('/')[2].split('bdp')[1]80 #env_ver = path.split('/')[4]81 env = 'p'82 env_ver = '01'83 usage = "usage: run_job.py grp_name app sub_app jobnames.list"84 parser = OptionParser(usage)85 (options, args) = parser.parse_args()86 if len(args) < 3:87 parser.error("Arguments - group_job_name and app name are required.")88 global app, sub_app89 grp_name = args[0]90 app = args[1]91 sub_app = args[2]92 jobnames = "jobnames.list"93 94 common_date =""95 if len(args) == 4:96 common_date = args[3].strip()97 else:98 common_date = str(datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d_%H:%M:%S.%f'))99 common_date_tfmt = datetime.strptime(common_date,'%Y-%m-%d_%H:%M:%S.%f')100 asofdate = common_date_tfmt.strftime('%Y-%m-%d')101 log_time = common_date_tfmt.strftime('%Y-%m-%d_%H-%M-%S') 102 rerunjobnames = "jobnames_"+asofdate+".list"103 rerun = "N"104# rerunjobnames = jobnames105# rerun = "Y"106 envvars.populate(env,env_ver,app,sub_app)107 108 log_date = common_date_tfmt.strftime('%Y-%m-%d')109 log_folder = envvars.list['lfs_app_logs'] + "/"+log_date110 if not os.path.exists(log_folder):111 os.makedirs(log_folder)112 chmod_log = "chmod 777 "+log_folder113 rc, status = commands.getstatusoutput(chmod_log)114 log_file = log_folder +"/run_job-" + grp_name + '_' + log_time + '.log'115 global abc_log_file, stdout_file116 abc_log_file = envvars.list['lfs_app_logs'] + "/"+grp_name+".tmp"117 failed_group_name = "@@"+ grp_name + '_' + log_time 118 119 print("LogFile: " + log_file)120 print("To Kill: kill " + str(os.getpid())) 121 f = open(log_file, "a",0)122 f.close()123 stdout_file = open(log_file, "r+",0)124 sys.stdout = stdout_file125 126 global kerb, user_name 127 rc, user_name = commands.getstatusoutput("echo $USER") 128 129 service_user_name = envvars.list['srvc_acct_login_'+app+'_'+sub_app]130 if service_user_name is not None and service_user_name != "":131 user_name = service_user_name132 if not os.path.isfile(envvars.list['lfs_keystore']+user_name.lower()+".keytab "):133 kerb = "kinit -k -t "+envvars.list['lfs_keystore']+"/"+user_name.lower()+".keytab "+user_name.lower()+envvars.list['domainName']134 rc, out = commands.getstatusoutput(kerb)135 print("run_job.py -> Authenticated : "+kerb+" RC:"+str(rc))136 else: 137 print("run_job.py -> Keytab file missing, not able to authenticate. Using user default authentication")138 139 140 141 142 start_line = "".join('*' for i in range(100))143 print start_line 144 print("run_job.py -> Started : " + datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))145 146 global abc_hdfs_put147 #hdfs_abc_log_file = envvars.list['hdfs_meta_raw']+"/"+envvars.list['hv_db_meta_stage']+"/abc_hadoop/load_date="+str(asofdate)+"/00000.log";148 #abc_hdfs_put = " ".join(["hdfs","dfs","-appendToFile",abc_log_file,149 # hdfs_abc_log_file]) 150 #hdfs_chmod = "hdfs dfs -chmod -R 777 " + hdfs_abc_log_file 151 #rc, out = commands.getstatusoutput(hdfs_chmod)152 #print("---Output of chmod command of abc_log_file-->"+hdfs_chmod)153 #print("run_job.py -> Invoked : " +hdfs_chmod)154 #print out 155 #msck_command = "beeline -u '" + envvars.list['hive2JDBC'] + ";principal=" + envvars.list['hive2Principal']+"' -e "156 #msck_command = "hive -e "157 #msck_command = msck_command + "'use "+ envvars.list['hv_db_meta_stage']+"; msck repair table abc_hadoop;'"158 159 comments = ""160 # determine joblist file path 161 job_list_file = envvars.list['lfs_app_config'] + '/' + jobnames162 rerun_job_list_file = envvars.list['lfs_app_config'] + '/' + grp_name + "_rerun.list"163 print("run_job.py -> JobList : " + job_list_file) 164 165 if os.path.isfile(rerun_job_list_file):166 job_list_file = rerun_job_list_file167 print("run_job.py -> JobList : Rerun file found, updating joblist lookup file. Please re-run if original entries has to run.")168 print("run_job.py -> JobList : " + job_list_file)169 comments = comments + "Rerun file found "+job_list_file170 else:171 comments = comments + "joblist file " + job_list_file172 173 abc_line = "|".join([grp_name,"run_job.py","python","CA-7 Job","",str(args),"STARTED",174 user_name,comments.replace(os.linesep,"---"),str(datetime.today())+"\n"]) 175 writeabc(abc_line)176 input_scripts_count = 0177 failed_scripts_count = 0178 failed_scripts = ""179 try:180 with open(job_list_file) as fin:181 for line in fin:182 args = line.split('|')183 if args[0].strip().lower() == grp_name.lower() or grp_name.lower() == '*all':184 options = ' --env ' + env + ' --app ' + app + ' --env_ver ' + env_ver + ' --group ' + grp_name185 options = options + ' --subapp ' + sub_app + ' --cmmn_dt ' + common_date186 if len(args) < 3:187 print("Error: Table name and script name not defined in config file")188 return None, None, None, None, None, None, None189 190 if len(args) >= 4:191 job = args[2].strip()192 if args[1].strip().lower() == 'g':193 path = envvars.list['lfs_global_scripts']194 else:195 path = envvars.list['lfs_app_scripts'] 196 options = options + ' --op0 ' + args[3].strip()197 if len(args) >= 5 and args[4].strip != "":198 options = options + ' --op1 ' + args[4].strip() 199 if len(args) >= 6 and args[5].strip != "":200 options = options + ' --op2 ' + args[5].strip()201 if len(args) >= 7 and args[6].strip != "":202 options = options + ' --op3 ' + args[6].strip()203 if len(args) >= 8 and args[7].strip != "":204 options = options + ' --op4 ' + args[7].strip() 205 if len(args) >= 9 and args[8].strip != "":206 options = options + ' --op5 ' + args[8].strip()207 if len(args) >= 10 and args[9].strip != "":208 options = options + ' --op6 ' + args[9].strip()209 if len(args) >= 11 and args[10].strip != "":210 options = options + ' --op7 ' + args[10].strip() 211 if len(args) >= 12 and args[11].strip != "":212 options = options + ' --op8 ' + args[11].strip()213 if len(args) >= 13 and args[12].strip != "":214 options = options + ' --op9 ' + args[12].strip()215 input_scripts_count = input_scripts_count + 1216 rc = call_script(path, job, options)217 if rc != 0:218 failed_scripts_count = failed_scripts_count + 1219 fs = line.split('|')220 fs[0] = failed_group_name 221 failed_scripts = failed_scripts + line222 if rc > return_code:223 return_code = rc 224 225 except IOError as e:226 if e.errno != errno.ENOENT:227 raise IOError("exception file reading error")228 else:229 print("No joblist file found")230 231 if return_code > 0:232 #if input_scripts_count != failed_scripts_count:233 with open(rerun_job_list_file, 'w') as myfile:234 myfile.write(failed_scripts)235 chmod_log = "chmod 777 "+rerun_job_list_file236 rc, status = commands.getstatusoutput(chmod_log)237 print "run_job.py -> Failed Script: Some scripts failed.. Please use below command to rerun.."238 print "run_job.py -> Re-run Cmd : "+ " ".join(["python",path+"/run_job.py",grp_name,app,sub_app])239 abc_line = "|".join([grp_name,"run_job.py","python","CA-7 Job","",str(args),"FAILED",240 user_name,"run_job failed, Some scripts failed.." + str(return_code),str(datetime.today())+"\n"]) 241 writeabc(abc_line)242 #else:243 # print "run_job.py -> Failed Script: All scripts failed.. Please use below command to rerun.."244 # print "run_job.py -> Re-run Cmd : "+ " ".join(["python",path+"/run_job.py",grp_name,app,sub_app,jobnames])245 # abc_line = "|".join([grp_name,"run_job.py","python","CA-7 Job","",str(args),"FAILED",246 # user_name,"run_job failed, all scripts failed.." + str(return_code),str(datetime.today())+"\n"]) 247 # writeabc(abc_line)248 elif os.path.isfile(rerun_job_list_file):249 print "run_job.py -> Deleting..." + str(rerun_job_list_file)250 os.remove(rerun_job_list_file)251 252 abc_line = "|".join([grp_name,"run_job.py","python","CA-7 Job","",str(args),"ENDED",253 user_name,"run_job ended,Return-Code:" + str(return_code),str(datetime.today())+"\n"]) 254 writeabc(abc_line)255 print("run_job.py -> Ended : Return-Code:" + str(return_code)+" " + datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))256 print start_line257 silentremove(abc_log_file)258 sys.exit(return_code)259if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!