Best Python code snippet using slash
test_16_HierarchicalLogging.py
Source:test_16_HierarchicalLogging.py
1#2# This file is protected by Copyright. Please refer to the COPYRIGHT file 3# distributed with this source distribution.4# 5# This file is part of REDHAWK core.6# 7# REDHAWK core is free software: you can redistribute it and/or modify it under 8# the terms of the GNU Lesser General Public License as published by the Free 9# Software Foundation, either version 3 of the License, or (at your option) any 10# later version.11# 12# REDHAWK core is distributed in the hope that it will be useful, but WITHOUT 13# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 14# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more15# details.16# 17# You should have received a copy of the GNU Lesser General Public License 18# along with this program. If not, see http://www.gnu.org/licenses/.19#20from _unitTestHelpers import scatest21from ossie.utils import sb, redhawk22import unittest, contextlib, time, os23from ossie.cf import CF24import tempfile25def killDomain(name):26 pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]27 for pid in pids:28 try:29 cmdline=open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()30 if 'DOMAIN_NAME' in cmdline:31 if name in cmdline:32 os.kill(int(pid), 2)33 except:34 pass35@scatest.requireLog4cxx36class CppHierarchicalDomainLogging(scatest.CorbaTestCase):37 def setUp(self):38 domBooter, self._domMgr = self.launchDomainManager()39 devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml")40 self._rhDom = redhawk.attach(scatest.getTestDomainName())41 self.assertEquals(len(self._rhDom._get_applications()), 0)42 def tearDown(self):43 # Do all application shutdown before calling the base class tearDown,44 # or failures will probably occur.45 redhawk.core._cleanUpLaunchedApps()46 scatest.CorbaTestCase.tearDown(self)47 try:48 os.remove('sdr/dom/waveforms/logger_overload_w/tmp.sad.xml')49 except:50 pass51 try:52 os.remove('sdr/dom/waveforms/logger_config/tmp.sad.xml')53 except:54 pass55 # need to let event service clean up event channels56 # cycle period is 10 milliseconds57 time.sleep(0.1)58 redhawk.setTrackApps(False)59 def test_logconfiguri_application(self):60 self.cname = "logger"61 # Automatically clean up62 redhawk.setTrackApps(True)63 # Create Application from $SDRROOT path64 app_1 = self._rhDom.createApplication("/waveforms/logger_w/logger_w.sad.xml", initConfiguration={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})65 self.assertEquals(app_1.getLogLevel('logger_1'), 30000)66 self.assertEquals(app_1.getLogLevel('logger_2'), 30000)67 loggers_1 = app_1.getNamedLoggers()68 app_2 = self._rhDom.createApplication("/waveforms/logger_w/logger_w.sad.xml")69 loggers_2 = app_2.getNamedLoggers()70 self.assertEquals(app_1.getLogLevel('logger_1'), 30000)71 self.assertEquals(app_1.getLogLevel('logger_2'), 30000)72 self.assertEquals(app_2.getLogLevel('logger_1'), 40000)73 self.assertEquals(app_2.getLogLevel('logger_2'), 40000)74 def test_logconfiguri_overload(self):75 self.cname = "logger"76 # Automatically clean up77 redhawk.setTrackApps(True)78 # Create Application from $SDRROOT path79 fp = open('sdr/dom/waveforms/logger_overload_w/logger_overload_w.sad.xml','r')80 sad_contents = fp.read()81 fp.close()82 sad_contents = sad_contents.replace('@@@CWD@@@', os.getcwd())83 fp = open('sdr/dom/waveforms/logger_overload_w/tmp.sad.xml','w')84 fp.write(sad_contents)85 fp.close()86 app_1 = self._rhDom.createApplication("/waveforms/logger_overload_w/tmp.sad.xml")87 self.assertEquals(app_1.getLogLevel('logger_2'), 30000)88 loggers_1 = app_1.getNamedLoggers()89 app_2 = self._rhDom.createApplication("/waveforms/logger_w/logger_w.sad.xml")90 loggers_2 = app_2.getNamedLoggers()91 self.assertEquals(app_1.getLogLevel('logger_2'), 30000)92 self.assertEquals(app_2.getLogLevel('logger_1'), 40000)93 def test_loggingconfig(self):94 self.cname = "logger"95 fp=open('./runtest.props','r')96 runtest_props = fp.read()97 fp.close()98 fp=open('./high_thresh.cfg','r')99 high_thresh_cfg = fp.read()100 fp.close()101 # Automatically clean up102 redhawk.setTrackApps(True)103 # Create Application from $SDRROOT path104 fp = open('sdr/dom/waveforms/logger_config/logger_config.sad.xml','r')105 sad_contents = fp.read()106 fp.close()107 sad_contents = sad_contents.replace('@@@CWD@@@', os.getcwd())108 fp = open('sdr/dom/waveforms/logger_config/tmp.sad.xml','w')109 fp.write(sad_contents)110 fp.close()111 app_1 = self._rhDom.createApplication("/waveforms/logger_config/tmp.sad.xml")112 self.assertEquals(app_1.getLogLevel('logger_1'), 0)113 self.assertEquals(app_1.getLogLevel('logger_2'), 50000)114 logger_1 = -1115 logger_2 = -1116 for comp_idx in range(len(app_1.comps)):117 if app_1.comps[comp_idx].instanceName == 'logger_1':118 logger_1 = comp_idx119 break120 if logger_1 == 0:121 logger_2 = 1122 if logger_1 == 1:123 logger_2 = 0124 self.assertNotEqual(logger_1, -1)125 self.assertNotEqual(logger_2, -1)126 self.assertEquals(app_1.comps[logger_1].getLogConfig(), high_thresh_cfg)127 self.assertEquals(app_1.comps[logger_2].getLogConfig(), runtest_props)128 loggers_1 = app_1.getNamedLoggers()129 app_2 = self._rhDom.createApplication("/waveforms/logger_config/tmp.sad.xml", initConfiguration={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})130 loggers_2 = app_2.getNamedLoggers()131 self.assertEquals(app_2.getLogLevel('logger_1'), 30000)132 self.assertEquals(app_2.getLogLevel('logger_2'), 30000)133 self.assertEquals(app_2.comps[logger_1].getLogConfig(), high_thresh_cfg)134 self.assertEquals(app_2.comps[logger_2].getLogConfig(), high_thresh_cfg)135 app_1.start()136 time.sleep(1.5)137 def test_application_cpp_access(self):138 self.cname = "logger"139 self.applicationAccess("/waveforms/logger_w/logger_w.sad.xml")140 def test_application_py_access(self):141 self.cname = "logger_py"142 self.applicationAccess("/waveforms/logger_py_w/logger_py_w.sad.xml")143 @scatest.requireJava144 def test_application_java_access(self):145 self.cname = "logger_java"146 self.applicationAccess("/waveforms/logger_java_w/logger_java_w.sad.xml")147 def applicationAccess(self, sadfile):148 # Automatically clean up149 redhawk.setTrackApps(True)150 # Create Application from $SDRROOT path151 app = self._rhDom.createApplication(sadfile)152 loggers = app.getNamedLoggers()153 orig_loggers = {}154 orig_loggers[self.cname+'_1'] = app.getLogLevel(self.cname+'_1')155 orig_loggers[self.cname+'_1.lower'] = app.getLogLevel(self.cname+'_1.lower')156 orig_loggers[self.cname+'_1.namespace.lower'] = app.getLogLevel(self.cname+'_1.namespace.lower')157 orig_loggers[self.cname+'_1.user.more_stuff'] = app.getLogLevel(self.cname+'_1.user.more_stuff')158 orig_loggers[self.cname+'_1.user.some_stuff'] = app.getLogLevel(self.cname+'_1.user.some_stuff')159 self.assertTrue(self.cname+'_1' in loggers)160 self.assertTrue(self.cname+'_1.lower' in loggers)161 self.assertTrue(self.cname+'_1.namespace.lower' in loggers)162 self.assertTrue(self.cname+'_1.system.PortSupplier' in loggers)163 self.assertTrue(self.cname+'_1.system.PropertySet' in loggers)164 self.assertTrue(self.cname+'_1.system.Resource' in loggers)165 self.assertTrue(self.cname+'_1.user.more_stuff' in loggers)166 self.assertTrue(self.cname+'_1.user.some_stuff' in loggers)167 self.assertTrue(self.cname+'_2' in loggers)168 self.assertTrue(self.cname+'_2.lower' in loggers)169 self.assertTrue(self.cname+'_2.namespace.lower' in loggers)170 self.assertTrue(self.cname+'_2.system.PortSupplier' in loggers)171 self.assertTrue(self.cname+'_2.system.PropertySet' in loggers)172 self.assertTrue(self.cname+'_2.system.Resource' in loggers)173 self.assertTrue(self.cname+'_2.user.more_stuff' in loggers)174 self.assertTrue(self.cname+'_2.user.some_stuff' in loggers)175 self.assertRaises(CF.UnknownIdentifier, app.setLogLevel, self.cname+'_1.foo', 'all')176 self.assertRaises(CF.UnknownIdentifier, app.getLogLevel, self.cname+'_1.foo')177 app.setLogLevel(self.cname+'_1', 'all')178 self.assertEquals(app.getLogLevel(self.cname+'_1'), CF.LogLevels.ALL)179 self.assertEquals(app.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.ALL)180 self.assertEquals(app.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.ALL)181 self.assertEquals(app.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)182 self.assertEquals(app.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.ALL)183 app.setLogLevel(self.cname+'_1', 'off')184 self.assertEquals(app.getLogLevel(self.cname+'_1'), CF.LogLevels.OFF)185 self.assertEquals(app.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.OFF)186 self.assertEquals(app.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.OFF)187 self.assertEquals(app.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.OFF)188 self.assertEquals(app.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.OFF)189 # break the level inheritance190 app.setLogLevel(self.cname+'_1.user', 'trace')191 app.setLogLevel(self.cname+'_1', 'all')192 self.assertEquals(app.getLogLevel(self.cname+'_1'), CF.LogLevels.ALL)193 self.assertEquals(app.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.ALL)194 self.assertEquals(app.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.ALL)195 self.assertEquals(app.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)196 self.assertEquals(app.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)197 # set the log with a value rather than the string198 app.setLogLevel(self.cname+'_1', CF.LogLevels.DEBUG)199 self.assertEquals(app.getLogLevel(self.cname+'_1'), CF.LogLevels.DEBUG)200 self.assertEquals(app.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.DEBUG)201 self.assertEquals(app.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.DEBUG)202 self.assertEquals(app.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)203 self.assertEquals(app.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)204 app.resetLog()205 self.assertEquals(orig_loggers[self.cname+'_1'], app.getLogLevel(self.cname+'_1'))206 self.assertEquals(orig_loggers[self.cname+'_1.lower'], app.getLogLevel(self.cname+'_1.lower'))207 self.assertEquals(orig_loggers[self.cname+'_1.namespace.lower'], app.getLogLevel(self.cname+'_1.namespace.lower'))208 self.assertEquals(orig_loggers[self.cname+'_1.user.more_stuff'], app.getLogLevel(self.cname+'_1.user.more_stuff'))209 self.assertEquals(orig_loggers[self.cname+'_1.user.some_stuff'], app.getLogLevel(self.cname+'_1.user.some_stuff'))210 # verify that inheritance is re-established211 app.setLogLevel(self.cname+'_1', 'all')212 self.assertEquals(CF.LogLevels.ALL, app.getLogLevel(self.cname+'_1'))213 self.assertEquals(CF.LogLevels.ALL, app.getLogLevel(self.cname+'_1.lower'))214 self.assertEquals(CF.LogLevels.ALL, app.getLogLevel(self.cname+'_1.namespace.lower'))215 self.assertEquals(CF.LogLevels.ALL, app.getLogLevel(self.cname+'_1.user.more_stuff'))216 self.assertEquals(CF.LogLevels.ALL, app.getLogLevel(self.cname+'_1.user.some_stuff'))217@scatest.requireLog4cxx218class ApplicationDomainLogging(scatest.CorbaTestCase):219 def setUp(self):220 self.tmpfile=tempfile.mktemp()221 devmgrs = ['test_ExecutableDevice_node']222 self._rhDom = redhawk.kickDomain(domain_name=scatest.getTestDomainName(),223 kick_device_managers=True,224 device_managers = devmgrs,225 stdout=self.tmpfile,226 detached=False)227 try:228 self.waitForDeviceManager(devmgrs[0])229 except:230 traceback.print_exc()231 pass232 self.assertEquals(len(self._rhDom._get_applications()), 0)233 def tearDown(self):234 # Do all application shutdown before calling the base class tearDown,235 # or failures will probably occur.236 redhawk.core._cleanUpLaunchedApps()237 scatest.CorbaTestCase.tearDown(self)238 try:239 os.remove(self.tmpfile)240 except:241 pass242 try:243 self._rhDom.terminate()244 except:245 pass246 killDomain(scatest.getTestDomainName())247 # need to let event service clean up event channels248 # cycle period is 10 milliseconds249 time.sleep(0.1)250 def test_domain_hierarchy_all(self):251 self._rhDom.setLogLevel('DomainManager', 'all')252 self._rhDom.devMgrs[0].setLogLevel('DeviceManager', 'all')253 props = self._rhDom.query([])254 props = self._rhDom.devMgrs[0].query([])255 fp = open(self.tmpfile, 'r')256 output=fp.read()257 fp.close()258 self.assertTrue('DomainManager.PropertySet' in output)259 self.assertTrue('DeviceManager.PropertySet' in output)260 def test_domain_hierarchy(self):261 self._rhDom.setLogLevel('DomainManager', 'info')262 self._rhDom.devMgrs[0].setLogLevel('DeviceManager', 'info')263 props = self._rhDom.query([])264 props = self._rhDom.devMgrs[0].query([])265 fp = open(self.tmpfile, 'r')266 output=fp.read()267 fp.close()268 self.assertFalse('DomainManager.PropertySet' in output)269 self.assertFalse('DeviceManager.PropertySet' in output)270 def application_default_log(self, appname, compname):271 app = self._rhDom.createApplication(appname)272 app.setLogLevel(compname+'_1.user.more_stuff', 'all')273 app.start()274 begin_time = time.time()275 while time.time()-begin_time < 1.5:276 fp = open(self.tmpfile, 'r')277 output=fp.read()278 fp.close()279 if output.find(compname+'_1.user.more_stuff') != -1:280 break281 time.sleep(0.1)282 app.stop()283 self.assertNotEqual(output.find(compname+'_1.user.more_stuff'), -1)284 def test_application_default_log_cpp(self):285 self.application_default_log("/waveforms/logger_w/logger_w.sad.xml", 'logger')286 def test_application_default_log_py(self):287 self.application_default_log("/waveforms/logger_py_w/logger_py_w.sad.xml", 'logger_py')288 @scatest.requireJava289 def test_application_default_log_java(self):290 self.application_default_log("/waveforms/logger_java_w/logger_java_w.sad.xml", 'logger_java')291def all_log_levels(_obj):292 _obj.comp = sb.launch(_obj.cname, properties={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})293 _obj.comp.start()294 # make sure that all the named loggers appear295 loggers = _obj.comp.getNamedLoggers()296 _obj.assertTrue(_obj.cname+'_1' in loggers)297 _obj.assertTrue(_obj.cname+'_1.lower' in loggers)298 _obj.assertTrue(_obj.cname+'_1.lower.second.first' in loggers)299 _obj.assertTrue(_obj.cname+'_1.lower.third' in loggers)300 _obj.assertTrue(_obj.cname+'_1.namespace.lower' in loggers)301 _obj.assertTrue(_obj.cname+'_1.user.more_stuff' in loggers)302 _obj.assertTrue(_obj.cname+'_1.user.some_stuff' in loggers)303 _obj.assertTrue(_obj.cname+'_1.system.PortSupplier' in loggers)304 _obj.assertTrue(_obj.cname+'_1.system.PropertySet' in loggers)305 _obj.assertTrue(_obj.cname+'_1.system.Resource' in loggers)306 for logger in loggers:307 _obj.assertTrue(_obj.cname+'_1' in logger)308 # verify that the logger level is inherited309 _obj.comp.setLogLevel(_obj.cname+'_1', 'all')310 time.sleep(0.5)311 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.ALL)312 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.ALL)313 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.ALL)314 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.ALL)315 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.ALL)316 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)317 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.ALL)318 _obj.comp.setLogLevel(_obj.cname+'_1', 'off')319 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.OFF)320 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.OFF)321 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.OFF)322 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.OFF)323 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.OFF)324 # make sure that the log content is correct325 content = _obj.readLogFile('foo/bar/test.log')326 find_1 = content.find('message from _log')327 find_2 = content.find('message from baseline_1_logger')328 find_3 = content.find('message from baseline_2_logger')329 find_4 = content.find('message from namespaced_logger')330 find_5 = content.find('message from basetree_logger')331 _obj.assertTrue(find_1<find_2)332 _obj.assertTrue(find_2<find_3)333 _obj.assertTrue(find_3<find_4)334 _obj.assertTrue(find_4<find_5)335 # verify that the loggers are off336 time.sleep(0.5)337 content_again = _obj.readLogFile('foo/bar/test.log')338 _obj.assertEquals(len(content), len(content_again))339 # break the level inheritance340 _obj.comp.setLogLevel(_obj.cname+'_1.user', 'trace')341 _obj.comp.setLogLevel(_obj.cname+'_1', 'all')342 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.ALL)343 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.ALL)344 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.ALL)345 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.ALL)346 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.ALL)347 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)348 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)349 # set the log with a value rather than the string350 _obj.comp.setLogLevel(_obj.cname+'_1', CF.LogLevels.DEBUG)351 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.DEBUG)352 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.DEBUG)353 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.DEBUG)354 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.DEBUG)355 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.DEBUG)356 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)357 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)358 _obj.comp.stop()359def reset_logger(_obj):360 _obj.comp = sb.launch(_obj.cname, properties={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})361 _obj.comp.start()362 # make sure that all the named loggers appear363 loggers = _obj.comp.getNamedLoggers()364 _obj.assertTrue(_obj.cname+'_1' in loggers)365 _obj.assertTrue(_obj.cname+'_1.lower' in loggers)366 _obj.assertTrue(_obj.cname+'_1.lower.second.first' in loggers)367 _obj.assertTrue(_obj.cname+'_1.lower.third' in loggers)368 _obj.assertTrue(_obj.cname+'_1.namespace.lower' in loggers)369 _obj.assertTrue(_obj.cname+'_1.user.more_stuff' in loggers)370 _obj.assertTrue(_obj.cname+'_1.user.some_stuff' in loggers)371 orig_loggers = {}372 orig_loggers[_obj.cname+'_1'] = _obj.comp.getLogLevel(_obj.cname+'_1')373 orig_loggers[_obj.cname+'_1.lower'] = _obj.comp.getLogLevel(_obj.cname+'_1.lower')374 orig_loggers[_obj.cname+'_1.lower.second.first'] = _obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first')375 orig_loggers[_obj.cname+'_1.lower.third'] = _obj.comp.getLogLevel(_obj.cname+'_1.lower.third')376 orig_loggers[_obj.cname+'_1.namespace.lower'] = _obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower')377 orig_loggers[_obj.cname+'_1.user.more_stuff'] = _obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff')378 orig_loggers[_obj.cname+'_1.user.some_stuff'] = _obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff')379 # verify that the logger level is inherited380 _obj.comp.setLogLevel(_obj.cname+'_1', 'all')381 time.sleep(0.5)382 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.ALL)383 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.ALL)384 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.ALL)385 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.ALL)386 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.ALL)387 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)388 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.ALL)389 _obj.comp.setLogLevel(_obj.cname+'_1', 'off')390 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.OFF)391 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.OFF)392 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.OFF)393 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.OFF)394 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.OFF)395 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.OFF)396 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.OFF)397 # break the level inheritance398 _obj.comp.setLogLevel(_obj.cname+'_1.user', 'trace')399 _obj.comp.setLogLevel(_obj.cname+'_1', 'all')400 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1'), CF.LogLevels.ALL)401 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower'), CF.LogLevels.ALL)402 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'), CF.LogLevels.ALL)403 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.lower.third'), CF.LogLevels.ALL)404 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'), CF.LogLevels.ALL)405 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)406 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)407 # verify that the levels are reset back to their original level408 _obj.comp.resetLog()409 _obj.assertEquals(orig_loggers[_obj.cname+'_1'], _obj.comp.getLogLevel(_obj.cname+'_1'))410 _obj.assertEquals(orig_loggers[_obj.cname+'_1.lower'], _obj.comp.getLogLevel(_obj.cname+'_1.lower'))411 _obj.assertEquals(orig_loggers[_obj.cname+'_1.lower.second.first'], _obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'))412 _obj.assertEquals(orig_loggers[_obj.cname+'_1.lower.third'], _obj.comp.getLogLevel(_obj.cname+'_1.lower.third'))413 _obj.assertEquals(orig_loggers[_obj.cname+'_1.namespace.lower'], _obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'))414 _obj.assertEquals(orig_loggers[_obj.cname+'_1.user.more_stuff'], _obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'))415 _obj.assertEquals(orig_loggers[_obj.cname+'_1.user.some_stuff'], _obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'))416 # verify that inheritance is re-established417 _obj.comp.setLogLevel(_obj.cname+'_1', 'all')418 _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1'))419 _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.lower'))420 _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.lower.second.first'))421 _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.lower.third'))422 _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.namespace.lower'))423 _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'))424 _obj.assertEquals(CF.LogLevels.ALL, _obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'))425 _obj.comp.stop()426def single_log_level(_obj):427 _obj.comp = sb.launch(_obj.cname, properties={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})428 _obj.comp.start()429 loggers = _obj.comp.getNamedLoggers()430 _obj.assertRaises(Exception, _obj.comp.setLogLevel, _obj.cname+'_1.user.more_stuff', 'hello')431 _obj.comp.setLogLevel(_obj.cname+'_1.user.more_stuff', 'all')432 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)433 time.sleep(0.5)434 _obj.comp.setLogLevel(_obj.cname+'_1.user.more_stuff', 'off')435 _obj.comp.setLogLevel(_obj.cname+'_1.user.some_stuff', 'all')436 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.some_stuff'), CF.LogLevels.ALL)437 time.sleep(0.5)438 _obj.comp.setLogLevel(_obj.cname+'_1.user.some_stuff', 'off')439 _obj.comp.setLogLevel(_obj.cname+'_1.user.more_stuff', 'all')440 _obj.assertEquals(_obj.comp.getLogLevel(_obj.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)441 time.sleep(0.5)442 _obj.comp.setLogLevel(_obj.cname+'_1.user.more_stuff', 'off')443 content = _obj.readLogFile('foo/bar/test.log')444 find_1 = content.find('message from baseline_1_logger')445 find_2 = content.find('message from baseline_2_logger')446 _obj.assertTrue(find_2<find_1)447 find_3 = content.find('message from baseline_2_logger', find_1)448 _obj.assertTrue(find_1<find_3)449 _obj.comp.stop()450def selective_log_setting(_obj):451 _obj.comp = sb.launch(_obj.cname, instanceName='logger_1', properties={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/hierarchical_log.cfg'})452 _obj.comp.start()453 loggers = _obj.comp.getNamedLoggers()454 _obj.comp.setLogLevel('logger_1.user.more_stuff', 'all')455 _obj.assertEquals(_obj.comp.getLogLevel('logger_1.user.more_stuff'), CF.LogLevels.ALL)456 _obj.assertEquals(_obj.comp.getLogLevel('logger_1.user.some_stuff'), CF.LogLevels.WARN)457 _obj.assertEquals(_obj.comp.getLogLevel('logger_1.namespace.lower'), CF.LogLevels.TRACE)458 time.sleep(0.5)459 _obj.comp.stop()460 test_content = _obj.readLogFile('foo/bar/test.log')461 logger_test_content = _obj.readLogFile('logger_test.log')462 count_test = test_content.count('message from baseline_2_logger')463 count_newline = test_content.count('\n')464 count_log4cxx_test = test_content.count('this is the log4cxx logger')465 count_java_eventchannel_messages = test_content.count('Unable to resolve EventChannelManager')466 count_logger_test = logger_test_content.count('message from namespaced_logger')467 count_logger_newline = logger_test_content.count('\n')468 _obj.assertEquals(count_test, count_newline-count_log4cxx_test-count_java_eventchannel_messages)469 _obj.assertEquals(count_logger_test, count_logger_newline)470class PyHierarchicalLogging(scatest.CorbaTestCase):471 def setUp(self):472 self.cname = "logger_py"473 def readLogFile(self, filename):474 fp = open(filename,'r')475 stuff = fp.read()476 return stuff477 def tearDown(self):478 sb.release()479 try:480 os.remove('foo/bar/test.log')481 except:482 pass483 try:484 os.rmdir('foo/bar')485 except:486 pass487 try:488 os.rmdir('foo')489 except:490 pass491 try:492 os.remove('logger_test.log')493 except:494 pass495 # Try to clean up the event channel, if it was created496 context = None497 # Do all application shutdown before calling the base class tearDown,498 # or failures will probably occur.499 scatest.CorbaTestCase.tearDown(self)500 def test_py_all_log_levels(self):501 all_log_levels(self)502 def test_py_reset_logger(self):503 reset_logger(self)504 def test_py_single_log_level(self):505 single_log_level(self)506 def test_py_selective_log_setting(self):507 selective_log_setting(self)508@scatest.requireJava509class JavaHierarchicalLogging(scatest.CorbaTestCase):510 def setUp(self):511 self.cname = "logger_java"512 def readLogFile(self, filename):513 fp = open(filename,'r')514 stuff = fp.read()515 return stuff516 def tearDown(self):517 sb.release()518 try:519 os.remove('foo/bar/test.log')520 except:521 pass522 try:523 os.rmdir('foo/bar')524 except:525 pass526 try:527 os.rmdir('foo')528 except:529 pass530 try:531 os.remove('logger_test.log')532 except:533 pass534 # Try to clean up the event channel, if it was created535 context = None536 # Do all application shutdown before calling the base class tearDown,537 # or failures will probably occur.538 scatest.CorbaTestCase.tearDown(self)539 def test_java_all_log_levels(self):540 all_log_levels(self)541 def test_java_reset_logger(self):542 reset_logger(self)543 def test_java_single_log_level(self):544 single_log_level(self)545 def test_java_selective_log_setting(self):546 selective_log_setting(self)547@scatest.requireLog4cxx548class CppHierarchicalLogging(scatest.CorbaTestCase):549 def setUp(self):550 self.cname = "logger"551 def readLogFile(self, filename):552 fp = open(filename,'r')553 stuff = fp.read()554 return stuff555 def tearDown(self):556 sb.release()557 try:558 os.remove('foo/bar/test.log')559 except:560 pass561 try:562 os.rmdir('foo/bar')563 except:564 pass565 try:566 os.rmdir('foo')567 except:568 pass569 try:570 os.remove('logger_test.log')571 except:572 pass573 # Try to clean up the event channel, if it was created574 context = None575 # Do all application shutdown before calling the base class tearDown,576 # or failures will probably occur.577 scatest.CorbaTestCase.tearDown(self)578 def test_access_log4cxx(self):579 self.comp = sb.launch(self.cname, properties={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/high_thresh.cfg'})580 self.comp.start()581 loggers = self.comp.getNamedLoggers()582 self.assertTrue(self.cname+'_1.l4.access' in loggers)583 self.comp.setLogLevel(self.cname+'_1.l4.access', 'info')584 time.sleep(0.5)585 self.comp.stop()586 content = self.readLogFile('foo/bar/test.log')587 self.assertNotEqual(content.find('this is the log4cxx logger'), -1)588 def test_all_log_levels(self):589 all_log_levels(self)590 def test_reset_logger(self):591 reset_logger(self)592 def test_single_log_level(self):593 single_log_level(self)594 def test_selective_log_setting(self):595 selective_log_setting(self)596@scatest.requireLog4cxx597class CppDeviceHierarchicalLogging(scatest.CorbaTestCase):598 def setUp(self):599 self.cname = "log_test_cpp"600 def readLogFile(self, filename):601 fp = open(filename,'r')602 stuff = fp.read()603 return stuff604 def tearDown(self):605 sb.release()606 try:607 os.remove('foo/bar/test.log')608 except:609 pass610 try:611 os.rmdir('foo/bar')612 except:613 pass614 try:615 os.rmdir('foo')616 except:617 pass618 try:619 os.remove('logger_test.log')620 except:621 pass622 # Try to clean up the event channel, if it was created623 context = None624 # Do all application shutdown before calling the base class tearDown,625 # or failures will probably occur.626 scatest.CorbaTestCase.tearDown(self)627 def test_cpp_dev_all_log_levels(self):628 all_log_levels(self)629 def test_cpp_dev_reset_logger(self):630 reset_logger(self)631 def test_cpp_dev_single_log_level(self):632 single_log_level(self)633 def test_cpp_dev_selective_log_setting(self):634 selective_log_setting(self)635@scatest.requireJava636class JavaDeviceHierarchicalLogging(scatest.CorbaTestCase):637 def setUp(self):638 self.cname = "log_test_java"639 def readLogFile(self, filename):640 fp = open(filename,'r')641 stuff = fp.read()642 return stuff643 def tearDown(self):644 sb.release()645 try:646 os.remove('foo/bar/test.log')647 except:648 pass649 try:650 os.rmdir('foo/bar')651 except:652 pass653 try:654 os.rmdir('foo')655 except:656 pass657 try:658 os.remove('logger_test.log')659 except:660 pass661 # Try to clean up the event channel, if it was created662 context = None663 # Do all application shutdown before calling the base class tearDown,664 # or failures will probably occur.665 scatest.CorbaTestCase.tearDown(self)666 def test_java_dev_all_log_levels(self):667 all_log_levels(self)668 def test_java_dev_reset_logger(self):669 reset_logger(self)670 def test_java_dev_single_log_level(self):671 single_log_level(self)672 def test_java_dev_selective_log_setting(self):673 selective_log_setting(self)674class PyDeviceHierarchicalLogging(scatest.CorbaTestCase):675 def setUp(self):676 self.cname = "log_test_py"677 def readLogFile(self, filename):678 fp = open(filename,'r')679 stuff = fp.read()680 return stuff681 def tearDown(self):682 sb.release()683 try:684 os.remove('foo/bar/test.log')685 except:686 pass687 try:688 os.rmdir('foo/bar')689 except:690 pass691 try:692 os.rmdir('foo')693 except:694 pass695 try:696 os.remove('logger_test.log')697 except:698 pass699 # Try to clean up the event channel, if it was created700 context = None701 # Do all application shutdown before calling the base class tearDown,702 # or failures will probably occur.703 scatest.CorbaTestCase.tearDown(self)704 def test_py_dev_all_log_levels(self):705 all_log_levels(self)706 def test_py_dev_reset_logger(self):707 reset_logger(self)708 def test_py_dev_single_log_level(self):709 single_log_level(self)710 def test_py_dev_selective_log_setting(self):711 selective_log_setting(self)712@scatest.requireLog4cxx713class CppDeviceHierarchicalDomainLogging(scatest.CorbaTestCase):714 def setUp(self):715 domBooter, self._domMgr = self.launchDomainManager()716 def tearDown(self):717 scatest.CorbaTestCase.tearDown(self)718 # need to let event service clean up event channels719 # cycle period is 10 milliseconds720 time.sleep(0.1)721 try:722 os.remove('sdr/dev/nodes/log_test_cpp_override_node/tmp.dcd.xml')723 except:724 pass725 def test_domMgr_log_level(self):726 self._rhDom = redhawk.attach(scatest.getTestDomainName())727 current_level = self._rhDom._get_log_level()728 self.assertNotEqual(current_level, 5000)729 self.assertEquals(self._rhDom.getLogLevel('DomainManager'),current_level)730 self.assertEquals(self._rhDom.getLogLevel('DomainManager.AllocationManager'),current_level)731 self.assertEquals(self._rhDom.getLogLevel('DomainManager.ConnectionManager'),current_level)732 self.assertEquals(self._rhDom.getLogLevel('DomainManager.File'),current_level)733 self.assertEquals(self._rhDom.getLogLevel('DomainManager.FileManager'),current_level)734 self.assertEquals(self._rhDom.getLogLevel('DomainManager.proputils'),current_level)735 self.assertEquals(self._rhDom.getLogLevel('DomainManager.EventChannelManager'),current_level)736 self._rhDom._set_log_level(5000)737 current_level = self._rhDom._get_log_level()738 self.assertEquals(self._rhDom.getLogLevel('DomainManager'),current_level)739 self.assertEquals(self._rhDom.getLogLevel('DomainManager.AllocationManager'),current_level)740 self.assertEquals(self._rhDom.getLogLevel('DomainManager.ConnectionManager'),current_level)741 self.assertEquals(self._rhDom.getLogLevel('DomainManager.File'),current_level)742 self.assertEquals(self._rhDom.getLogLevel('DomainManager.FileManager'),current_level)743 self.assertEquals(self._rhDom.getLogLevel('DomainManager.proputils'),current_level)744 self.assertEquals(self._rhDom.getLogLevel('DomainManager.EventChannelManager'),current_level)745 def test_devMgr_cpp_access(self):746 self.cname = "log_test_cpp"747 self.devMgrAccess("/nodes/log_test_cpp_node/DeviceManager.dcd.xml")748 def test_devMgr_py_access(self):749 self.cname = "log_test_py"750 self.devMgrAccess("/nodes/log_test_py_node/DeviceManager.dcd.xml")751 @scatest.requireJava752 def test_devMgr_java_access(self):753 self.cname = "log_test_java"754 self.devMgrAccess("/nodes/log_test_java_node/DeviceManager.dcd.xml")755 def devMgrAccess(self, dcdfile):756 devBooter, self._devMgr = self.launchDeviceManager(dcdfile)757 self._rhDom = redhawk.attach(scatest.getTestDomainName())758 self.assertEquals(len(self._rhDom.devMgrs), 1)759 # Create Application from $SDRROOT path760 devMgr = self._rhDom.devMgrs[0]761 loggers = devMgr.getNamedLoggers()762 orig_loggers = {}763 orig_loggers[self.cname+'_1'] = devMgr.getLogLevel(self.cname+'_1')764 orig_loggers[self.cname+'_1.lower'] = devMgr.getLogLevel(self.cname+'_1.lower')765 orig_loggers[self.cname+'_1.namespace.lower'] = devMgr.getLogLevel(self.cname+'_1.namespace.lower')766 orig_loggers[self.cname+'_1.user.more_stuff'] = devMgr.getLogLevel(self.cname+'_1.user.more_stuff')767 orig_loggers[self.cname+'_1.user.some_stuff'] = devMgr.getLogLevel(self.cname+'_1.user.some_stuff')768 self.assertTrue(self.cname+'_1' in loggers)769 self.assertTrue(self.cname+'_1.lower' in loggers)770 self.assertTrue(self.cname+'_1.namespace.lower' in loggers)771 self.assertTrue(self.cname+'_1.system.PortSupplier' in loggers)772 self.assertTrue(self.cname+'_1.system.PropertySet' in loggers)773 self.assertTrue(self.cname+'_1.system.Resource' in loggers)774 self.assertTrue(self.cname+'_1.user.more_stuff' in loggers)775 self.assertTrue(self.cname+'_1.user.some_stuff' in loggers)776 self.assertTrue(self.cname+'_2' in loggers)777 self.assertTrue(self.cname+'_2.lower' in loggers)778 self.assertTrue(self.cname+'_2.namespace.lower' in loggers)779 self.assertTrue(self.cname+'_2.system.PortSupplier' in loggers)780 self.assertTrue(self.cname+'_2.system.PropertySet' in loggers)781 self.assertTrue(self.cname+'_2.system.Resource' in loggers)782 self.assertTrue(self.cname+'_2.user.more_stuff' in loggers)783 self.assertTrue(self.cname+'_2.user.some_stuff' in loggers)784 self.assertRaises(CF.UnknownIdentifier, devMgr.setLogLevel, self.cname+'_1.foo', 'all')785 self.assertRaises(CF.UnknownIdentifier, devMgr.getLogLevel, self.cname+'_1.foo')786 devMgr.setLogLevel(self.cname+'_1', 'all')787 self.assertEquals(devMgr.getLogLevel(self.cname+'_1'), CF.LogLevels.ALL)788 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.ALL)789 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.ALL)790 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.ALL)791 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.ALL)792 devMgr.setLogLevel(self.cname+'_1', 'off')793 self.assertEquals(devMgr.getLogLevel(self.cname+'_1'), CF.LogLevels.OFF)794 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.OFF)795 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.OFF)796 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.OFF)797 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.OFF)798 # break the level inheritance799 devMgr.setLogLevel(self.cname+'_1.user', 'trace')800 devMgr.setLogLevel(self.cname+'_1', 'all')801 self.assertEquals(devMgr.getLogLevel(self.cname+'_1'), CF.LogLevels.ALL)802 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.ALL)803 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.ALL)804 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)805 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)806 # set the log with a value rather than the string807 devMgr.setLogLevel(self.cname+'_1', CF.LogLevels.DEBUG)808 self.assertEquals(devMgr.getLogLevel(self.cname+'_1'), CF.LogLevels.DEBUG)809 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.lower'), CF.LogLevels.DEBUG)810 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.namespace.lower'), CF.LogLevels.DEBUG)811 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.more_stuff'), CF.LogLevels.TRACE)812 self.assertEquals(devMgr.getLogLevel(self.cname+'_1.user.some_stuff'), CF.LogLevels.TRACE)813 devMgr.resetLog()814 self.assertEquals(orig_loggers[self.cname+'_1'], devMgr.getLogLevel(self.cname+'_1'))815 self.assertEquals(orig_loggers[self.cname+'_1.lower'], devMgr.getLogLevel(self.cname+'_1.lower'))816 self.assertEquals(orig_loggers[self.cname+'_1.namespace.lower'], devMgr.getLogLevel(self.cname+'_1.namespace.lower'))817 self.assertEquals(orig_loggers[self.cname+'_1.user.more_stuff'], devMgr.getLogLevel(self.cname+'_1.user.more_stuff'))818 self.assertEquals(orig_loggers[self.cname+'_1.user.some_stuff'], devMgr.getLogLevel(self.cname+'_1.user.some_stuff'))819 # verify that inheritance is re-established820 devMgr.setLogLevel(self.cname+'_1', 'all')821 self.assertEquals(CF.LogLevels.ALL, devMgr.getLogLevel(self.cname+'_1'))822 self.assertEquals(CF.LogLevels.ALL, devMgr.getLogLevel(self.cname+'_1.lower'))823 self.assertEquals(CF.LogLevels.ALL, devMgr.getLogLevel(self.cname+'_1.namespace.lower'))824 self.assertEquals(CF.LogLevels.ALL, devMgr.getLogLevel(self.cname+'_1.user.more_stuff'))825 self.assertEquals(CF.LogLevels.ALL, devMgr.getLogLevel(self.cname+'_1.user.some_stuff'))826 def test_devMgr_overload(self):827 fp = open('sdr/dev/nodes/log_test_cpp_override_node/DeviceManager.dcd.xml','r')828 dcd_contents = fp.read()829 fp.close()830 dcd_contents = dcd_contents.replace('@@@CWD@@@', os.getcwd())831 fp = open('sdr/dev/nodes/log_test_cpp_override_node/tmp.dcd.xml','w')832 fp.write(dcd_contents)833 fp.close()834 devBooter, self._devMgr = self.launchDeviceManager('/nodes/log_test_cpp_override_node/tmp.dcd.xml')835 devBooter_2, self._devMgr_2 = self.launchDeviceManager('/nodes/log_test_cpp_node/DeviceManager.dcd.xml')836 self._rhDom = redhawk.attach(scatest.getTestDomainName())837 self.assertEquals(len(self._rhDom.devMgrs), 2)838 # Create Application from $SDRROOT path839 _devMgr_o = None840 _devMgr = None841 for _d in self._rhDom.devMgrs:842 if _d.name == 'log_test_cpp_override_node':843 _devMgr_o = _d844 if _d.name == 'log_test_cpp_node':845 _devMgr = _d846 self.assertNotEqual(_devMgr, None)847 self.assertNotEqual(_devMgr_o, None)848 self.assertEquals(_devMgr_o.getLogLevel('log_test_cpp_1'), 40000)849 self.assertEquals(_devMgr_o.getLogLevel('log_test_cpp_2'), 30000)850 self.assertEquals(_devMgr.getLogLevel('log_test_cpp_1'), 40000)851 self.assertEquals(_devMgr.getLogLevel('log_test_cpp_2'), 40000)852 def test_devMgr_level_trace(self):853 devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=5)854 self._rhDom = redhawk.attach(scatest.getTestDomainName())855 self.assertEquals(len(self._rhDom.devMgrs), 1)856 self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.ALL) # CF ALL and CF TRACE overlap for compatibility reasons857 self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.ALL)858 def test_devMgr_level_debug(self):859 devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=4)860 self._rhDom = redhawk.attach(scatest.getTestDomainName())861 self.assertEquals(len(self._rhDom.devMgrs), 1)862 self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.DEBUG)863 self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.DEBUG)864 def test_devMgr_level_info(self):865 devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=3)866 self._rhDom = redhawk.attach(scatest.getTestDomainName())867 self.assertEquals(len(self._rhDom.devMgrs), 1)868 self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.INFO)869 self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.INFO)870 def test_devMgr_level_warn(self):871 devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=2)872 self._rhDom = redhawk.attach(scatest.getTestDomainName())873 self.assertEquals(len(self._rhDom.devMgrs), 1)874 self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.WARN)875 self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.WARN)876 def test_devMgr_level_error(self):877 devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=1)878 self._rhDom = redhawk.attach(scatest.getTestDomainName())879 self.assertEquals(len(self._rhDom.devMgrs), 1)880 self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.ERROR)881 self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.ERROR)882 def test_devMgr_level_fatal(self):883 devBooter, self._devMgr = self.launchDeviceManager('/nodes/node_device_deps/DeviceManager.dcd.xml.cpp', debug=0)884 self._rhDom = redhawk.attach(scatest.getTestDomainName())885 self.assertEquals(len(self._rhDom.devMgrs), 1)886 self.assertEquals(self._rhDom.devMgrs[0]._get_log_level(), CF.LogLevels.FATAL)887 self.assertEquals(self._rhDom.devMgrs[0].devs[0]._get_log_level(), CF.LogLevels.FATAL)888if __name__ == "__main__":889 # Run the unittests...
dlm.py
Source:dlm.py
1#!/usr/bin/python32# -*- coding: utf-8 -*-3import zlib, tempfile, io4from ._binarystream import _BinaryStream5from collections import OrderedDict6class InvalidDLMFile(Exception):7 def __init__(self, message):8 super(InvalidDLMFile, self).__init__(message)9 self.message = message10class DLM:11 def __init__(self, stream, key=None):12 if key == None:13 raise InvalidDLMFile("Map decryption key is empty.")14 self._stream = stream15 self._key = key16 def read(self):17 dlm_uncompressed = tempfile.TemporaryFile()18 dlm_uncompressed.write(zlib.decompress(self._stream.read()))19 dlm_uncompressed.seek(0)20 DLM_file_binary = _BinaryStream(dlm_uncompressed, True)21 map = Map(DLM_file_binary, self._key)22 map.read()23 dlm_uncompressed.close()24 return map.getObj()25 def write(self, obj):26 buffer = tempfile.TemporaryFile()27 buffer_stream = _BinaryStream(buffer, True)28 map = Map(buffer_stream, self._key)29 map.setObj(obj)30 map.write()31 buffer.seek(0)32 self._stream.write(zlib.compress(buffer.read()))33 buffer.close()34class Map:35 def __init__(self, raw, key):36 self._raw = raw37 self._key = key38 self._obj = OrderedDict()39 self.topArrowCell = []40 self.bottomArrowCell = []41 self.leftArrowCell = []42 self.rightArrowCell = []43 def raw(self):44 return self._raw45 def read(self):46 self._obj["header"] = self.raw().read_char()47 self._obj["mapVersion"] = self.raw().read_char()48 self._obj["mapId"] = self.raw().read_uint32()49 if self._obj["mapVersion"] >= 7:50 self._obj["encrypted"] = self.raw().read_bool()51 self._obj["encryptionVersion"] = self.raw().read_char()52 self.dataLen = self.raw().read_int32()53 if self._obj["encrypted"]:54 self.encryptedData = self.raw().read_bytes(self.dataLen)55 decryptedData = bytearray()56 for i in range(0, self.dataLen):57 decryptedData.append(self.encryptedData[i] ^ ord(self._key[i % len(self._key)]))58 cleanData = io.BytesIO(decryptedData)59 self._raw = _BinaryStream(cleanData, True)60 self._obj["relativeId"] = self.raw().read_uint32()61 self._obj["mapType"] = self.raw().read_char()62 self._obj["subareaId"] = self.raw().read_int32()63 self._obj["topNeighbourId"] = self.raw().read_int32()64 self._obj["bottomNeighbourId"] = self.raw().read_int32()65 self._obj["leftNeighbourId"] = self.raw().read_int32()66 self._obj["rightNeighbourId"] = self.raw().read_int32()67 self._obj["shadowBonusOnEntities"] = self.raw().read_uint32()68 if self._obj["mapVersion"] >= 9:69 read_color = self.raw().read_int32()70 self._obj["backgroundAlpha"] = (read_color & 4278190080) >> 3271 self._obj["backgroundRed"] = (read_color & 16711680) >> 1672 self._obj["backgroundGreen"] = (read_color & 65280) >> 873 self._obj["backgroundBlue"] = read_color & 25574 read_color = self.raw().read_uint32()75 grid_alpha = (read_color & 4278190080) >> 3276 grid_red = (read_color & 16711680) >> 1677 grid_green = (read_color & 65280) >> 878 grid_blue = read_color & 25579 self._obj["gridColor"] = (grid_alpha & 255) << 32 | (grid_red & 255) << 16 | (80 grid_green & 255) << 8 | grid_blue & 25581 elif self._obj["mapVersion"] >= 3:82 self._obj["backgroundRed"] = self.raw().read_char()83 self._obj["backgroundGreen"] = self.raw().read_char()84 self._obj["backgroundBlue"] = self.raw().read_char()85 self._obj["backgroundColor"] = (self._obj["backgroundRed"] & 255) << 16 | (86 self._obj["backgroundGreen"] & 255) << 8 | self._obj["backgroundBlue"] & 25587 if self._obj["mapVersion"] >= 4:88 self._obj["zoomScale"] = self.raw().read_uint16() / 10089 self._obj["zoomOffsetX"] = self.raw().read_int16()90 self._obj["zoomOffsetY"] = self.raw().read_int16()91 if self._obj["zoomScale"] < 1:92 self._obj["zoomScale"] = 193 self._obj["zoomOffsetX"] = 094 self._obj["zoomOffsetY"] = 095 if self._obj["mapVersion"] > 10:96 self.raw().read_int32()97 self._obj["useLowPassFilter"] = self.raw().read_bool()98 self._obj["useReverb"] = self.raw().read_bool()99 if self._obj["useReverb"]:100 self._obj["presetId"] = self.raw().read_int32()101 else:102 self._obj["presetId"] = -1103 self._obj["backgroundsCount"] = self.raw().read_char()104 self._obj["backgroundFixtures"] = []105 for i in range(0, self._obj["backgroundsCount"]):106 bg = Fixture(self)107 bg.read()108 self._obj["backgroundFixtures"].append(bg.getObj())109 self._obj["foregroundsCount"] = self.raw().read_char()110 self._obj["foregroundsFixtures"] = []111 for i in range(0, self._obj["foregroundsCount"]):112 fg = Fixture(self)113 fg.read()114 self._obj["foregroundsFixtures"].append(fg.getObj())115 self.raw().read_int32()116 self._obj["groundCRC"] = self.raw().read_int32()117 self._obj["layersCount"] = self.raw().read_char()118 self._obj["layers"] = []119 for i in range(0, self._obj["layersCount"]):120 la = Layer(self, self._obj["mapVersion"])121 la.read()122 self._obj["layers"].append(la.getObj())123 self._obj["cellsCount"] = 560 # MAP_CELLS_COUNT124 self._obj["cells"] = []125 for i in range(0, self._obj["cellsCount"]):126 cd = CellData(self, i, self._obj["mapVersion"])127 cd.read()128 self._obj["cells"].append(cd.getObj())129 def write(self):130 output_stream = self._raw131 cleanData = io.BytesIO() # tempfile.TemporaryFile()132 self._raw = _BinaryStream(cleanData, True)133 self.raw().write_uint32(self._obj["relativeId"])134 self.raw().write_char(self._obj["mapType"])135 self.raw().write_int32(self._obj["subareaId"])136 self.raw().write_int32(self._obj["topNeighbourId"])137 self.raw().write_int32(self._obj["bottomNeighbourId"])138 self.raw().write_int32(self._obj["leftNeighbourId"])139 self.raw().write_int32(self._obj["rightNeighbourId"])140 self.raw().write_int32(self._obj["shadowBonusOnEntities"])141 if self._obj["mapVersion"] >= 9:142 write_color = ((self._obj["backgroundAlpha"] << 32) & 4278190080 |143 (self._obj["backgroundRed"] << 16) & 16711680 |144 (self._obj["backgroundGreen"] << 8) & 65280 |145 self._obj["backgroundBlue"] & 255)146 self.raw().write_int32(write_color)147 write_color = self._obj["gridColor"]148 self.raw().write_uint32(write_color)149 elif self._obj["mapVersion"] >= 3:150 self.raw().write_char(self._obj["backgroundRed"])151 self.raw().write_char(self._obj["backgroundGreen"])152 self.raw().write_char(self._obj["backgroundBlue"])153 if self._obj["mapVersion"] >= 4:154 self.raw().write_int16(self._obj["zoomScale"])155 self.raw().write_int16(self._obj["zoomOffsetX"])156 self.raw().write_int16(self._obj["zoomOffsetY"])157 self.raw().write_bool(self._obj["useLowPassFilter"])158 self.raw().write_bool(self._obj["useReverb"])159 if self._obj["useReverb"]:160 self.raw().write_int32(self._obj["presetId"])161 self.raw().write_char(self._obj["backgroundsCount"])162 for i in range(0, self._obj["backgroundsCount"]):163 self._obj["backgroundFixtures"][i].write()164 self.raw().write_char(self._obj["foregroundsCount"])165 for i in range(0, self._obj["foregroundsCount"]):166 self._obj["foregroundsFixtures"][i].write()167 self.raw().write_int32(self._obj["unknown_1"])168 self.raw().write_int32(self._obj["groundCRC"])169 self.raw().write_char(self._obj["layersCount"])170 for i in range(0, self._obj["layersCount"]):171 self._obj["layers"][i].write()172 for i in range(0, self._obj["cellsCount"]):173 self._obj["cells"][i].write()174 input_stram = self._raw175 self._raw = output_stream176 cleanData.seek(0)177 self.raw().write_char(self._obj["header"])178 self.raw().write_char(self._obj["mapVersion"])179 self.raw().write_int32(self._obj["mapId"])180 if self._obj["mapVersion"] >= 7:181 self.raw().write_bool(self._obj["encrypted"])182 self.raw().write_char(self._obj["encryptionVersion"])183 self.raw().write_int32(len(cleanData.getbuffer())) # TODO: check len with getBuffer184 encryptedData = input_stram.read_bytes()185 for i in range(0, len(cleanData.getbuffer())):186 self.raw().write_uchar(encryptedData[i] ^ ord(self._key[i % len(self._key)][0]))187 else:188 self.raw().write_bytes(input_stram.read_bytes())189 def getObj(self):190 return self._obj191 def setObj(self, obj):192 self._obj = obj193 for i in range(0, self._obj["backgroundsCount"]):194 bg = Fixture(self)195 ce.setObj(self._obj["backgroundFixtures"][i])196 self._obj["backgroundFixtures"][i] = bg197 for i in range(0, self._obj["foregroundsCount"]):198 fg = Fixture(self)199 fg.setObj(self._obj["foregroundsFixtures"][i])200 self._obj["foregroundsFixtures"][i] = fg201 for i in range(0, self._obj["layersCount"]):202 la = Layer(self, self._obj["mapVersion"])203 la.setObj(self._obj["layers"][i])204 self._obj["layers"][i] = la205 for i in range(0, self._obj["cellsCount"]):206 ce = CellData(self, i, self._obj["mapVersion"])207 ce.setObj(self._obj["cells"][i])208 self._obj["cells"][i] = ce209class Fixture:210 def __init__(self, parrent):211 self._parrent = parrent212 self._obj = OrderedDict()213 def raw(self):214 return self._parrent.raw()215 def read(self):216 self._obj["fixtureId"] = self.raw().read_int32()217 self._obj["offsetX"] = self.raw().read_int16()218 self._obj["offsetY"] = self.raw().read_int16()219 self._obj["rotation"] = self.raw().read_int16()220 self._obj["xScale"] = self.raw().read_int16()221 self._obj["yScale"] = self.raw().read_int16()222 self._obj["redMultiplier"] = self.raw().read_char()223 self._obj["greenMultiplier"] = self.raw().read_char()224 self._obj["blueMultiplier"] = self.raw().read_char()225 self._obj["hue"] = self._obj["redMultiplier"] | self._obj["greenMultiplier"] | self._obj["blueMultiplier"]226 self._obj["alpha"] = self.raw().read_uchar()227 def write(self):228 self.raw().write_int32(self._obj["fixtureId"])229 self.raw().write_int16(self._obj["offsetX"])230 self.raw().write_int16(self._obj["offsetY"])231 self.raw().write_int16(self._obj["rotation"])232 self.raw().write_int16(self._obj["xScale"])233 self.raw().write_int16(self._obj["yScale"])234 self.raw().write_char(self._obj["redMultiplier"])235 self.raw().write_char(self._obj["greenMultiplier"])236 self.raw().write_char(self._obj["blueMultiplier"])237 self.raw().write_uchar(self._obj["alpha"])238 def getObj(self):239 return self._obj240 def setObj(self, obj):241 self._obj = obj242class Layer:243 def __init__(self, parrent, mapVersion):244 self._parrent = parrent245 self._obj = OrderedDict()246 self.mapVersion = mapVersion247 def raw(self):248 return self._parrent.raw()249 def read(self):250 if self.mapVersion >= 9:251 self._obj["layerId"] = self.raw().read_char()252 else:253 self._obj["layerId"] = self.raw().read_int32()254 self._obj["cellsCount"] = self.raw().read_int16()255 self._obj["cells"] = []256 for i in range(0, self._obj["cellsCount"]):257 ce = Cell(self, self.mapVersion)258 ce.read()259 self._obj["cells"].append(ce.getObj())260 def write(self):261 if self.mapVersion >= 9:262 self.raw().write_byte(self._obj["layerId"])263 else:264 self.raw().write_int32(self._obj["layerId"])265 self.raw().write_int16(self._obj["cellsCount"])266 for i in range(0, self._obj["cellsCount"]):267 self._obj["cells"][i].write()268 def getObj(self):269 return self._obj270 def setObj(self, obj):271 self._obj = obj272 for i in range(0, self._obj["cellsCount"]):273 ce = Cell(self, self.mapVersion)274 ce.setObj(self._obj["cells"][i])275 self._obj["cells"][i] = ce276class Cell:277 def __init__(self, parrent, mapVersion):278 self._parrent = parrent279 self._obj = OrderedDict()280 self.mapVersion = mapVersion281 def raw(self):282 return self._parrent.raw()283 def read(self):284 self._obj["cellId"] = self.raw().read_int16()285 self._obj["elementsCount"] = self.raw().read_int16()286 self._obj["elements"] = []287 for i in range(0, self._obj["elementsCount"]):288 el = BasicElement().GetElementFromType(self, self.raw().read_char(), self.mapVersion)289 el.read()290 self._obj["elements"].append(el.getObj())291 def write(self):292 self.raw().write_int16(self._obj["cellId"])293 self.raw().write_int16(self._obj["elementsCount"])294 for i in range(0, self._obj["elementsCount"]):295 if self._obj["elements"][i]._obj["elementName"] == "Graphical":296 self.raw().write_char(2)297 elif self._obj["elements"][i]._obj["elementName"] == "Sound":298 self.raw().write_char(33)299 else:300 raise InvalidDLMFile("Invalid element type.")301 self._obj["elements"][i].write()302 def getObj(self):303 return self._obj304 def setObj(self, obj):305 self._obj = obj306 for i in range(0, self._obj["elementsCount"]):307 if self._obj["elements"][i]["elementName"] == "Graphical":308 el = GraphicalElement(self, self.mapVersion)309 elif self._obj["elements"][i]["elementName"] == "Sound":310 el = SoundElement(self, self.mapVersion)311 else:312 raise InvalidDLMFile("Invalid element type.")313 el.setObj(self._obj["elements"][i])314 self._obj["elements"][i] = el315class CellData:316 def __init__(self, parrent, id, mapVersion):317 self._parrent = parrent318 self._obj = OrderedDict()319 self.cellId = id320 self.mapVersion = mapVersion321 def raw(self):322 return self._parrent.raw()323 def read(self):324 self._obj["floor"] = self.raw().read_char() * 10325 if self._obj["floor"] == -1280:326 return327 if self.mapVersion >= 9:328 tmp_bytes = self.raw().read_int16()329 self._obj["mov"] = (tmp_bytes & 1) == 0330 self._obj["nonWalkableDuringFight"] = (tmp_bytes & 2) != 0331 self._obj["nonWalkableDuringRP"] = (tmp_bytes & 4) != 0332 self._obj["los"] = (tmp_bytes & 8) == 0333 self._obj["blue"] = (tmp_bytes & 16) != 0334 self._obj["red"] = (tmp_bytes & 32) != 0335 self._obj["visible"] = (tmp_bytes & 64) != 0336 self._obj["farmCell"] = (tmp_bytes & 128) != 0337 if self.mapVersion >= 10:338 self._obj["havenbagCell"] = (tmp_bytes & 256) != 0339 top_arrow = (tmp_bytes & 512) != 0340 bottom_arrow = (tmp_bytes & 1024) != 0341 right_arrow = (tmp_bytes & 2048) != 0342 left_arrow = (tmp_bytes & 4096) != 0343 else:344 top_arrow = (tmp_bytes & 256) != 0345 bottom_arrow = (tmp_bytes & 512) != 0346 right_arrow = (tmp_bytes & 1024) != 0347 left_arrow = (tmp_bytes & 2048) != 0348 if top_arrow:349 self._parrent.topArrowCell.append(self.cellId)350 if bottom_arrow:351 self._parrent.bottomArrowCell.append(self.cellId)352 if right_arrow:353 self._parrent.rightArrowCell.append(self.cellId)354 if left_arrow:355 self._parrent.leftArrowCell.append(self.cellId)356 else:357 self._obj["losmov"] = self.raw().read_uchar()358 self._obj["los"] = (self._obj["losmov"] & 2) >> 1 == 1359 self._obj["mov"] = (self._obj["losmov"] & 1) == 1360 self._obj["visible"] = (self._obj["losmov"] & 64) >> 6 == 1361 self._obj["farmCell"] = (self._obj["losmov"] & 32) >> 5 == 1362 self._obj["blue"] = (self._obj["losmov"] & 16) >> 4 == 1363 self._obj["red"] = (self._obj["losmov"] & 8) >> 3 == 1364 self._obj["nonWalkableDuringRP"] = (self._obj["losmov"] & 128) >> 7 == 1365 self._obj["nonWalkableDuringFight"] = (self._obj["losmov"] & 4)366 self._obj["speed"] = self.raw().read_char()367 self._obj["mapChangeData"] = self.raw().read_char()368 if self.mapVersion > 5:369 self._obj["moveZone"] = self.raw().read_uchar()370 if self.mapVersion > 10 and (self.hasLinkedZoneFight() or self.hasLinkedZoneRp()):371 self.raw().read_uchar()372 if self.mapVersion > 7 and self.mapVersion < 9:373 self._obj["tmpBits"] = self.raw().read_char()374 self.arrow = 15 & self._obj["tmpBits"]375 if self.useTopArrow():376 self._parrent.topArrowCell.append(self.cellId)377 if self.useBottomArrow():378 self._parrent.bottomArrowCell.append(self.cellId)379 if self.useLeftArrow():380 self._parrent.leftArrowCell.append(self.cellId)381 if self.useRightArrow():382 self._parrent.rightArrowCell.append(self.cellId)383 def write(self):384 if self._obj["floor"] == -1280:385 return386 self.raw().write_char(self._obj["floor"])387 if self.mapVersion >= 9:388 tmp_bytes = self.raw().read_int16()389 tmp_bytes |= 1 if self._obj["mov"] else 0390 tmp_bytes |= 2 if self._obj["nonWalkableDuringFight"] else 0391 tmp_bytes |= 4 if self._obj["nonWalkableDuringRP"] else 0392 tmp_bytes |= 8 if self._obj["los"] else 0393 tmp_bytes |= 16 if self._obj["blue"] else 0394 tmp_bytes |= 32 if self._obj["red"] else 0395 tmp_bytes |= 64 if self._obj["visible"] else 0396 tmp_bytes |= 128 if self._obj["farmCell"] else 0397 if self.mapVersion >= 10:398 tmp_bytes |= 256 if self._obj["havenbagCell"] else 0399 self.raw().write_int16(tmp_bytes)400 else:401 self.raw().write_uchar(self._obj["losmov"])402 self.raw().write_char(self._obj["speed"])403 self.raw().write_uchar(self._obj["mapChangeData"])404 if self.mapVersion > 5:405 self.raw().write_uchar(self._obj["moveZone"])406 if self.mapVersion > 7 and self.mapVersion < 9:407 self.raw().write_char(self._obj["tmpBits"])408 def getObj(self):409 return self._obj410 def setObj(self, obj):411 self._obj = obj412 def useTopArrow(self):413 if (self.arrow & 1) == 0:414 return False415 else:416 return True417 def useBottomArrow(self):418 if (self.arrow & 2) == 0:419 return False420 else:421 return True422 def useLeftArrow(self):423 if (self.arrow & 4) == 0:424 return False425 else:426 return True427 def useRightArrow(self):428 if (self.arrow & 8) == 0:429 return False430 else:431 return True432 def hasLinkedZoneRp(self):433 if self._obj["mov"] and not self._obj["farmCell"]:434 return True435 else:436 return False437 def hasLinkedZoneFight(self):438 if self._obj["mov"] and not self._obj["nonWalkableDuringFight"] and not self._obj["farmCell"] and self._obj["havenbagCell"]:439 return True440 else:441 return False442class BasicElement:443 def GetElementFromType(self, parrent, type, mapVersion):444 if type == 2: # GRAPHICAL445 return GraphicalElement(parrent, mapVersion)446 elif type == 33: # SOUND447 return SoundElement(parrent, mapVersion)448 else:449 raise InvalidDLMFile("Invalid element type.")450class GraphicalElement:451 def __init__(self, parrent, mapVersion):452 self._parrent = parrent453 self._obj = OrderedDict()454 self.mapVersion = mapVersion455 self._obj["elementName"] = "Graphical"456 def raw(self):457 return self._parrent.raw()458 def read(self):459 self._obj["elementId"] = self.raw().read_uint32()460 # hue461 self._obj["hue_1"] = self.raw().read_char()462 self._obj["hue_2"] = self.raw().read_char()463 self._obj["hue_3"] = self.raw().read_char()464 # shadow465 self._obj["shadow_1"] = self.raw().read_char()466 self._obj["shadow_2"] = self.raw().read_char()467 self._obj["shadow_3"] = self.raw().read_char()468 if self.mapVersion <= 4:469 self._obj["offsetX"] = self.raw().read_char()470 self._obj["offsetY"] = self.raw().read_char()471 else:472 self._obj["offsetX"] = self.raw().read_int16()473 self._obj["offsetY"] = self.raw().read_int16()474 self._obj["altitude"] = self.raw().read_char()475 self._obj["identifier"] = self.raw().read_uint32()476 def write(self):477 self.raw().write_uint32(self._obj["elementId"])478 self.raw().write_char(self._obj["hue_1"])479 self.raw().write_char(self._obj["hue_2"])480 self.raw().write_char(self._obj["hue_3"])481 self.raw().write_char(self._obj["shadow_1"])482 self.raw().write_char(self._obj["shadow_2"])483 self.raw().write_char(self._obj["shadow_3"])484 if self.mapVersion <= 4:485 self.raw().write_char(self._obj["offsetX"])486 self.raw().write_char(self._obj["offsetY"])487 else:488 self.raw().write_int16(self._obj["offsetX"])489 self.raw().write_int16(self._obj["offsetY"])490 self.raw().write_char(self._obj["altitude"])491 self.raw().write_uint32(self._obj["identifier"])492 def getObj(self):493 return self._obj494 def setObj(self, obj):495 self._obj = obj496class SoundElement:497 def __init__(self, parrent, mapVersion):498 self._parrent = parrent499 self._obj = OrderedDict()500 self.mapVersion = mapVersion501 self._obj["elementName"] = "Sound"502 def raw(self):503 return self._parrent.raw()504 def read(self):505 self._obj["soundId"] = self.raw().read_int32()506 self._obj["baseVolume"] = self.raw().read_int16()507 self._obj["fullVolumeDistance"] = self.raw().read_int32()508 self._obj["nullVolumeDistance"] = self.raw().read_int32()509 self._obj["minDelayBetweenLoops"] = self.raw().read_int16()510 self._obj["maxDelayBetweenLoops"] = self.raw().read_int16()511 def write(self):512 self.raw().write_int32(self._obj["soundId"])513 self.raw().write_int16(self._obj["baseVolume"])514 self.raw().write_int32(self._obj["fullVolumeDistance"])515 self.raw().write_int32(self._obj["nullVolumeDistance"])516 self.raw().write_int16(self._obj["minDelayBetweenLoops"])517 self.raw().write_int16(self._obj["maxDelayBetweenLoops"])518 def getObj(self):519 return self._obj520 def setObj(self, obj):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!