Best Python code snippet using lemoncheesecake
dumping_wrapper_test.py
Source:dumping_wrapper_test.py
1# Copyright 2016 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# ==============================================================================15"""Unit Tests for classes in dumping_wrapper.py."""16from __future__ import absolute_import17from __future__ import division18from __future__ import print_function19import glob20import os21import tempfile22import threading23from tensorflow.python.client import session24from tensorflow.python.debug.lib import debug_data25from tensorflow.python.debug.wrappers import dumping_wrapper26from tensorflow.python.debug.wrappers import framework27from tensorflow.python.debug.wrappers import hooks28from tensorflow.python.framework import constant_op29from tensorflow.python.framework import dtypes30from tensorflow.python.framework import ops31from tensorflow.python.framework import test_util32from tensorflow.python.lib.io import file_io33from tensorflow.python.ops import array_ops34from tensorflow.python.ops import state_ops35from tensorflow.python.ops import variables36from tensorflow.python.platform import gfile37from tensorflow.python.platform import googletest38from tensorflow.python.training import monitored_session39@test_util.run_v1_only("b/120545219")40class DumpingDebugWrapperSessionTest(test_util.TensorFlowTestCase):41 def setUp(self):42 self.session_root = tempfile.mkdtemp()43 self.v = variables.VariableV1(10.0, dtype=dtypes.float32, name="v")44 self.delta = constant_op.constant(1.0, dtype=dtypes.float32, name="delta")45 self.eta = constant_op.constant(-1.4, dtype=dtypes.float32, name="eta")46 self.inc_v = state_ops.assign_add(self.v, self.delta, name="inc_v")47 self.dec_v = state_ops.assign_add(self.v, self.eta, name="dec_v")48 self.ph = array_ops.placeholder(dtypes.float32, shape=(), name="ph")49 self.inc_w_ph = state_ops.assign_add(self.v, self.ph, name="inc_w_ph")50 self.sess = session.Session()51 self.sess.run(self.v.initializer)52 def tearDown(self):53 ops.reset_default_graph()54 if os.path.isdir(self.session_root):55 file_io.delete_recursively(self.session_root)56 def _assert_correct_run_subdir_naming(self, run_subdir):57 self.assertStartsWith(run_subdir, "run_")58 self.assertEqual(2, run_subdir.count("_"))59 self.assertGreater(int(run_subdir.split("_")[1]), 0)60 def testConstructWrapperWithExistingNonEmptyRootDirRaisesException(self):61 dir_path = os.path.join(self.session_root, "foo")62 os.mkdir(dir_path)63 self.assertTrue(os.path.isdir(dir_path))64 with self.assertRaisesRegexp(65 ValueError, "session_root path points to a non-empty directory"):66 dumping_wrapper.DumpingDebugWrapperSession(67 session.Session(), session_root=self.session_root, log_usage=False)68 def testConstructWrapperWithExistingFileDumpRootRaisesException(self):69 file_path = os.path.join(self.session_root, "foo")70 open(file_path, "a").close() # Create the file71 self.assertTrue(gfile.Exists(file_path))72 self.assertFalse(gfile.IsDirectory(file_path))73 with self.assertRaisesRegexp(ValueError,74 "session_root path points to a file"):75 dumping_wrapper.DumpingDebugWrapperSession(76 session.Session(), session_root=file_path, log_usage=False)77 def testConstructWrapperWithNonexistentSessionRootCreatesDirectory(self):78 new_dir_path = os.path.join(tempfile.mkdtemp(), "new_dir")79 dumping_wrapper.DumpingDebugWrapperSession(80 session.Session(), session_root=new_dir_path, log_usage=False)81 self.assertTrue(gfile.IsDirectory(new_dir_path))82 # Cleanup.83 gfile.DeleteRecursively(new_dir_path)84 def testDumpingOnASingleRunWorks(self):85 sess = dumping_wrapper.DumpingDebugWrapperSession(86 self.sess, session_root=self.session_root, log_usage=False)87 sess.run(self.inc_v)88 dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))89 self.assertEqual(1, len(dump_dirs))90 self._assert_correct_run_subdir_naming(os.path.basename(dump_dirs[0]))91 dump = debug_data.DebugDumpDir(dump_dirs[0])92 self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))93 self.assertEqual(repr(self.inc_v), dump.run_fetches_info)94 self.assertEqual(repr(None), dump.run_feed_keys_info)95 def testDumpingOnASingleRunWorksWithRelativePathForDebugDumpDir(self):96 sess = dumping_wrapper.DumpingDebugWrapperSession(97 self.sess, session_root=self.session_root, log_usage=False)98 sess.run(self.inc_v)99 dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))100 cwd = os.getcwd()101 try:102 os.chdir(self.session_root)103 dump = debug_data.DebugDumpDir(104 os.path.relpath(dump_dirs[0], self.session_root))105 self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))106 finally:107 os.chdir(cwd)108 def testDumpingOnASingleRunWithFeedDictWorks(self):109 sess = dumping_wrapper.DumpingDebugWrapperSession(110 self.sess, session_root=self.session_root, log_usage=False)111 feed_dict = {self.ph: 3.2}112 sess.run(self.inc_w_ph, feed_dict=feed_dict)113 dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))114 self.assertEqual(1, len(dump_dirs))115 self._assert_correct_run_subdir_naming(os.path.basename(dump_dirs[0]))116 dump = debug_data.DebugDumpDir(dump_dirs[0])117 self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))118 self.assertEqual(repr(self.inc_w_ph), dump.run_fetches_info)119 self.assertEqual(repr(feed_dict.keys()), dump.run_feed_keys_info)120 def testDumpingOnMultipleRunsWorks(self):121 sess = dumping_wrapper.DumpingDebugWrapperSession(122 self.sess, session_root=self.session_root, log_usage=False)123 for _ in range(3):124 sess.run(self.inc_v)125 dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))126 dump_dirs = sorted(127 dump_dirs, key=lambda x: int(os.path.basename(x).split("_")[1]))128 self.assertEqual(3, len(dump_dirs))129 for i, dump_dir in enumerate(dump_dirs):130 self._assert_correct_run_subdir_naming(os.path.basename(dump_dir))131 dump = debug_data.DebugDumpDir(dump_dir)132 self.assertAllClose([10.0 + 1.0 * i],133 dump.get_tensors("v", 0, "DebugIdentity"))134 self.assertEqual(repr(self.inc_v), dump.run_fetches_info)135 self.assertEqual(repr(None), dump.run_feed_keys_info)136 def testUsingNonCallableAsWatchFnRaisesTypeError(self):137 bad_watch_fn = "bad_watch_fn"138 with self.assertRaisesRegexp(TypeError, "watch_fn is not callable"):139 dumping_wrapper.DumpingDebugWrapperSession(140 self.sess,141 session_root=self.session_root,142 watch_fn=bad_watch_fn,143 log_usage=False)144 def testDumpingWithLegacyWatchFnOnFetchesWorks(self):145 """Use a watch_fn that returns different whitelists for different runs."""146 def watch_fn(fetches, feeds):147 del feeds148 # A watch_fn that picks fetch name.149 if fetches.name == "inc_v:0":150 # If inc_v, watch everything.151 return "DebugIdentity", r".*", r".*"152 else:153 # If dec_v, watch nothing.154 return "DebugIdentity", r"$^", r"$^"155 sess = dumping_wrapper.DumpingDebugWrapperSession(156 self.sess,157 session_root=self.session_root,158 watch_fn=watch_fn,159 log_usage=False)160 for _ in range(3):161 sess.run(self.inc_v)162 sess.run(self.dec_v)163 dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))164 dump_dirs = sorted(165 dump_dirs, key=lambda x: int(os.path.basename(x).split("_")[1]))166 self.assertEqual(6, len(dump_dirs))167 for i, dump_dir in enumerate(dump_dirs):168 self._assert_correct_run_subdir_naming(os.path.basename(dump_dir))169 dump = debug_data.DebugDumpDir(dump_dir)170 if i % 2 == 0:171 self.assertGreater(dump.size, 0)172 self.assertAllClose([10.0 - 0.4 * (i / 2)],173 dump.get_tensors("v", 0, "DebugIdentity"))174 self.assertEqual(repr(self.inc_v), dump.run_fetches_info)175 self.assertEqual(repr(None), dump.run_feed_keys_info)176 else:177 self.assertEqual(0, dump.size)178 self.assertEqual(repr(self.dec_v), dump.run_fetches_info)179 self.assertEqual(repr(None), dump.run_feed_keys_info)180 def testDumpingWithLegacyWatchFnWithNonDefaultDebugOpsWorks(self):181 """Use a watch_fn that specifies non-default debug ops."""182 def watch_fn(fetches, feeds):183 del fetches, feeds184 return ["DebugIdentity", "DebugNumericSummary"], r".*", r".*"185 sess = dumping_wrapper.DumpingDebugWrapperSession(186 self.sess,187 session_root=self.session_root,188 watch_fn=watch_fn,189 log_usage=False)190 sess.run(self.inc_v)191 dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))192 self.assertEqual(1, len(dump_dirs))193 dump = debug_data.DebugDumpDir(dump_dirs[0])194 self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))195 self.assertEqual(14,196 len(dump.get_tensors("v", 0, "DebugNumericSummary")[0]))197 def testDumpingWithWatchFnWithNonDefaultDebugOpsWorks(self):198 """Use a watch_fn that specifies non-default debug ops."""199 def watch_fn(fetches, feeds):200 del fetches, feeds201 return framework.WatchOptions(202 debug_ops=["DebugIdentity", "DebugNumericSummary"],203 node_name_regex_whitelist=r"^v.*",204 op_type_regex_whitelist=r".*",205 tensor_dtype_regex_whitelist=".*_ref")206 sess = dumping_wrapper.DumpingDebugWrapperSession(207 self.sess,208 session_root=self.session_root,209 watch_fn=watch_fn,210 log_usage=False)211 sess.run(self.inc_v)212 dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))213 self.assertEqual(1, len(dump_dirs))214 dump = debug_data.DebugDumpDir(dump_dirs[0])215 self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))216 self.assertEqual(14,217 len(dump.get_tensors("v", 0, "DebugNumericSummary")[0]))218 dumped_nodes = [dump.node_name for dump in dump.dumped_tensor_data]219 self.assertNotIn("inc_v", dumped_nodes)220 self.assertNotIn("delta", dumped_nodes)221 def testDumpingDebugHookWithoutWatchFnWorks(self):222 dumping_hook = hooks.DumpingDebugHook(self.session_root, log_usage=False)223 mon_sess = monitored_session._HookedSession(self.sess, [dumping_hook])224 mon_sess.run(self.inc_v)225 dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))226 self.assertEqual(1, len(dump_dirs))227 self._assert_correct_run_subdir_naming(os.path.basename(dump_dirs[0]))228 dump = debug_data.DebugDumpDir(dump_dirs[0])229 self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))230 self.assertEqual(repr(self.inc_v), dump.run_fetches_info)231 self.assertEqual(repr(None), dump.run_feed_keys_info)232 def testDumpingDebugHookWithStatefulWatchFnWorks(self):233 watch_fn_state = {"run_counter": 0}234 def counting_watch_fn(fetches, feed_dict):235 del fetches, feed_dict236 watch_fn_state["run_counter"] += 1237 if watch_fn_state["run_counter"] % 2 == 1:238 # If odd-index run (1-based), watch every ref-type tensor.239 return framework.WatchOptions(240 debug_ops="DebugIdentity",241 tensor_dtype_regex_whitelist=".*_ref")242 else:243 # If even-index run, watch nothing.244 return framework.WatchOptions(245 debug_ops="DebugIdentity",246 node_name_regex_whitelist=r"^$",247 op_type_regex_whitelist=r"^$")248 dumping_hook = hooks.DumpingDebugHook(249 self.session_root, watch_fn=counting_watch_fn, log_usage=False)250 mon_sess = monitored_session._HookedSession(self.sess, [dumping_hook])251 for _ in range(4):252 mon_sess.run(self.inc_v)253 dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))254 dump_dirs = sorted(255 dump_dirs, key=lambda x: int(os.path.basename(x).split("_")[1]))256 self.assertEqual(4, len(dump_dirs))257 for i, dump_dir in enumerate(dump_dirs):258 self._assert_correct_run_subdir_naming(os.path.basename(dump_dir))259 dump = debug_data.DebugDumpDir(dump_dir)260 if i % 2 == 0:261 self.assertAllClose([10.0 + 1.0 * i],262 dump.get_tensors("v", 0, "DebugIdentity"))263 self.assertNotIn("delta",264 [datum.node_name for datum in dump.dumped_tensor_data])265 else:266 self.assertEqual(0, dump.size)267 self.assertEqual(repr(self.inc_v), dump.run_fetches_info)268 self.assertEqual(repr(None), dump.run_feed_keys_info)269 def testDumpingDebugHookWithStatefulLegacyWatchFnWorks(self):270 watch_fn_state = {"run_counter": 0}271 def counting_watch_fn(fetches, feed_dict):272 del fetches, feed_dict273 watch_fn_state["run_counter"] += 1274 if watch_fn_state["run_counter"] % 2 == 1:275 # If odd-index run (1-based), watch everything.276 return "DebugIdentity", r".*", r".*"277 else:278 # If even-index run, watch nothing.279 return "DebugIdentity", r"$^", r"$^"280 dumping_hook = hooks.DumpingDebugHook(281 self.session_root, watch_fn=counting_watch_fn, log_usage=False)282 mon_sess = monitored_session._HookedSession(self.sess, [dumping_hook])283 for _ in range(4):284 mon_sess.run(self.inc_v)285 dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))286 dump_dirs = sorted(287 dump_dirs, key=lambda x: int(os.path.basename(x).split("_")[1]))288 self.assertEqual(4, len(dump_dirs))289 for i, dump_dir in enumerate(dump_dirs):290 self._assert_correct_run_subdir_naming(os.path.basename(dump_dir))291 dump = debug_data.DebugDumpDir(dump_dir)292 if i % 2 == 0:293 self.assertAllClose([10.0 + 1.0 * i],294 dump.get_tensors("v", 0, "DebugIdentity"))295 else:296 self.assertEqual(0, dump.size)297 self.assertEqual(repr(self.inc_v), dump.run_fetches_info)298 self.assertEqual(repr(None), dump.run_feed_keys_info)299 def testDumpingFromMultipleThreadsObeysThreadNameFilter(self):300 sess = dumping_wrapper.DumpingDebugWrapperSession(301 self.sess, session_root=self.session_root, log_usage=False,302 thread_name_filter=r"MainThread$")303 self.assertAllClose(1.0, sess.run(self.delta))304 child_thread_result = []305 def child_thread_job():306 child_thread_result.append(sess.run(self.eta))307 thread = threading.Thread(name="ChildThread", target=child_thread_job)308 thread.start()309 thread.join()310 self.assertAllClose([-1.4], child_thread_result)311 dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))312 self.assertEqual(1, len(dump_dirs))313 dump = debug_data.DebugDumpDir(dump_dirs[0])314 self.assertEqual(1, dump.size)315 self.assertEqual("delta", dump.dumped_tensor_data[0].node_name)316 def testDumpingWrapperWithEmptyFetchWorks(self):317 sess = dumping_wrapper.DumpingDebugWrapperSession(318 self.sess, session_root=self.session_root, log_usage=False)319 sess.run([])320if __name__ == "__main__":...
session_debug_test.py
Source:session_debug_test.py
1# Copyright 2015 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# ==============================================================================15"""Tests for debugger functionalities in tf.Session."""16from __future__ import absolute_import17from __future__ import division18from __future__ import print_function19import glob20import os21import shutil22import tempfile23import numpy as np24from six.moves import xrange # pylint: disable=redefined-builtin25from tensorflow.core.protobuf import config_pb226from tensorflow.core.util import event_pb227from tensorflow.python.client import session28from tensorflow.python.framework import constant_op29from tensorflow.python.framework import tensor_util30from tensorflow.python.framework import test_util31from tensorflow.python.ops import control_flow_ops32from tensorflow.python.ops import math_ops33from tensorflow.python.ops import state_ops34from tensorflow.python.ops import variables35from tensorflow.python.platform import googletest36class SessionDebugTest(test_util.TensorFlowTestCase):37 def setUp(self):38 self.dump_root_ = tempfile.mkdtemp()39 def tearDown(self):40 # Tear down temporary dump directory.41 shutil.rmtree(self.dump_root_)42 def _addDebugTensorWatch(self,43 run_opts,44 node_name,45 output_slot,46 debug_op="DebugIdentity",47 debug_urls=None):48 watch_opts = run_opts.debug_tensor_watch_opts49 # Add debug tensor watch for u.50 watch = watch_opts.add()51 watch.node_name = node_name52 watch.output_slot = 053 watch.debug_ops.append(debug_op)54 if debug_urls:55 for debug_url in debug_urls:56 watch.debug_urls.append(debug_url)57 def _verifyTensorDumpFile(self, dump_file, expected_tensor_name, debug_op,58 wall_time_lower_bound, expected_tensor_val):59 """Helper method: Verify a Tensor debug dump file and its content.60 Args:61 dump_file: Path to the dump file.62 expected_tensor_name: Expected name of the tensor, e.g., node_a:0.63 debug_op: Name of the debug Op, e.g., DebugIdentity.64 wall_time_lower_bound: Lower bound of the wall time.65 expected_tensor_val: Expected tensor value, as a numpy array.66 """67 self.assertTrue(os.path.isfile(dump_file))68 event = event_pb2.Event()69 f = open(dump_file, "rb")70 event.ParseFromString(f.read())71 wall_time = event.wall_time72 debg_node_name = event.summary.value[0].node_name73 tensor_value = tensor_util.MakeNdarray(event.summary.value[0].tensor)74 self.assertGreater(wall_time, wall_time_lower_bound)75 self.assertEqual("%s:%s" % (expected_tensor_name, debug_op), debg_node_name)76 if expected_tensor_val.dtype.type is np.string_:77 self.assertEqual(str(expected_tensor_val), str(tensor_value))78 else:79 self.assertAllClose(expected_tensor_val, tensor_value)80 def testDumpToFileOverlaoppinpParentDir(self):81 with session.Session() as sess:82 u_init_val = np.array([[5.0, 3.0], [-1.0, 0.0]])83 v_init_val = np.array([[2.0], [-1.0]])84 # Use node names with overlapping namespace (i.e., parent directory) to85 # test concurrent, non-racing directory creation.86 u_name = "testDumpToFile/u"87 v_name = "testDumpToFile/v"88 u_init = constant_op.constant(u_init_val, shape=[2, 2])89 u = variables.Variable(u_init, name=u_name)90 v_init = constant_op.constant(v_init_val, shape=[2, 1])91 v = variables.Variable(v_init, name=v_name)92 w = math_ops.matmul(u, v, name="testDumpToFile/matmul")93 u.initializer.run()94 v.initializer.run()95 run_options = config_pb2.RunOptions()96 debug_url = "file://%s" % self.dump_root_97 # Add debug tensor watch for u.98 self._addDebugTensorWatch(99 run_options, "%s/read" % u_name, 0, debug_urls=[debug_url])100 # Add debug tensor watch for v.101 self._addDebugTensorWatch(102 run_options, "%s/read" % v_name, 0, debug_urls=[debug_url])103 run_metadata = config_pb2.RunMetadata()104 # Invoke Session.run().105 sess.run(w, options=run_options, run_metadata=run_metadata)106 # Verify the dump file for u.107 dump_files = os.listdir(os.path.join(self.dump_root_, u_name))108 self.assertEqual(1, len(dump_files))109 self.assertTrue(dump_files[0].startswith("read_0_"))110 dump_file = os.path.join(self.dump_root_, u_name, dump_files[0])111 self._verifyTensorDumpFile(dump_file, "%s/read:0" % u_name,112 "DebugIdentity", 0, u_init_val)113 # Verify the dump file for v.114 dump_files = os.listdir(os.path.join(self.dump_root_, v_name))115 self.assertEqual(1, len(dump_files))116 self.assertTrue(dump_files[0].startswith("read_0_"))117 dump_file = os.path.join(self.dump_root_, v_name, dump_files[0])118 self._verifyTensorDumpFile(dump_file, "%s/read:0" % v_name,119 "DebugIdentity", 0, v_init_val)120 def testDumpStringTensorsToFileSystem(self):121 with session.Session() as sess:122 str1_init_val = np.array(b"abc")123 str2_init_val = np.array(b"def")124 str1_init = constant_op.constant(str1_init_val)125 str2_init = constant_op.constant(str2_init_val)126 str1_name = "str1"127 str2_name = "str2"128 str1 = variables.Variable(str1_init, name=str1_name)129 str2 = variables.Variable(str2_init, name=str2_name)130 # Concatenate str1 and str2131 str_concat = math_ops.add(str1, str2, name="str_concat")132 str1.initializer.run()133 str2.initializer.run()134 run_options = config_pb2.RunOptions()135 debug_url = "file://%s" % self.dump_root_136 # Add debug tensor watch for u.137 self._addDebugTensorWatch(138 run_options, "%s/read" % str1_name, 0, debug_urls=[debug_url])139 # Add debug tensor watch for v.140 self._addDebugTensorWatch(141 run_options, "%s/read" % str2_name, 0, debug_urls=[debug_url])142 run_metadata = config_pb2.RunMetadata()143 # Invoke Session.run().144 sess.run(str_concat, options=run_options, run_metadata=run_metadata)145 # Verify the dump file for str1.146 dump_files = os.listdir(os.path.join(self.dump_root_, str1_name))147 self.assertEqual(1, len(dump_files))148 self.assertTrue(dump_files[0].startswith("read_0_"))149 dump_file = os.path.join(self.dump_root_, str1_name, dump_files[0])150 self._verifyTensorDumpFile(dump_file, "%s/read:0" % str1_name,151 "DebugIdentity", 0, str1_init_val)152 # Verify the dump file for str2.153 dump_files = os.listdir(os.path.join(self.dump_root_, str2_name))154 self.assertEqual(1, len(dump_files))155 self.assertTrue(dump_files[0].startswith("read_0_"))156 dump_file = os.path.join(self.dump_root_, str2_name, dump_files[0])157 self._verifyTensorDumpFile(dump_file, "%s/read:0" % str2_name,158 "DebugIdentity", 0, str2_init_val)159 def testDumpToFileWhileLoop(self):160 with session.Session() as sess:161 num_iter = 10162 # "u" is the Variable being updated in the loop.163 u_name = "testDumpToFileWhileLoop/u"164 u_namespace = u_name.split("/")[0]165 u_init_val = np.array(11.0)166 u_init = constant_op.constant(u_init_val)167 u = variables.Variable(u_init, name=u_name)168 # "v" is the increment.169 v_name = "testDumpToFileWhileLoop/v"170 v_namespace = v_name.split("/")[0]171 v_init_val = np.array(2.0)172 v_init = constant_op.constant(v_init_val)173 v = variables.Variable(v_init, name=v_name)174 u.initializer.run()175 v.initializer.run()176 i = constant_op.constant(0, name="testDumpToFileWhileLoop/i")177 def cond(i):178 return math_ops.less(i, num_iter)179 def body(i):180 new_u = state_ops.assign_add(u, v)181 new_i = math_ops.add(i, 1)182 op = control_flow_ops.group(new_u)183 new_i = control_flow_ops.with_dependencies([op], new_i)184 return [new_i]185 loop = control_flow_ops.while_loop(cond, body, [i], parallel_iterations=1)186 # Create RunOptions for debug-watching tensors187 run_options = config_pb2.RunOptions()188 debug_url = "file://%s" % self.dump_root_189 # Add debug tensor watch for u.190 self._addDebugTensorWatch(run_options, u_name, 0, debug_urls=[debug_url])191 # Add debug tensor watch for v.192 self._addDebugTensorWatch(193 run_options, "%s/read" % v_name, 0, debug_urls=[debug_url])194 # Add debug tensor watch for while/Identity.195 self._addDebugTensorWatch(196 run_options, "while/Identity", 0, debug_urls=[debug_url])197 run_metadata = config_pb2.RunMetadata()198 r = sess.run(loop, options=run_options, run_metadata=run_metadata)199 self.assertEqual(num_iter, r)200 u_val_final = sess.run(u)201 self.assertAllClose(u_init_val + num_iter * v_init_val, u_val_final)202 # Verify dump files203 self.assertTrue(os.path.isdir(self.dump_root_))204 self.assertTrue(os.path.isdir(os.path.join(self.dump_root_, u_namespace)))205 self.assertTrue(206 os.path.isdir(os.path.join(self.dump_root_, v_namespace, "v")))207 # Verify the dump file for tensor "u".208 dump_files = glob.glob(209 os.path.join(self.dump_root_, u_namespace, "u_0_*"))210 self.assertEqual(1, len(dump_files))211 dump_file = os.path.join(self.dump_root_, u_namespace, dump_files[0])212 self.assertTrue(os.path.isfile(dump_file))213 self._verifyTensorDumpFile(dump_file, "%s:0" % u_name, "DebugIdentity", 0,214 u_init_val)215 # Verify the dump file for tensor "v".216 dump_files = os.listdir(os.path.join(self.dump_root_, v_name))217 self.assertEqual(1, len(dump_files))218 self.assertTrue(dump_files[0].startswith("read_0_"))219 dump_file = os.path.join(self.dump_root_, v_name, dump_files[0])220 self._verifyTensorDumpFile(dump_file, "%s/read:0" % v_name,221 "DebugIdentity", 0, v_init_val)222 # Verify the dump files for tensor while/Identity223 while_identity_dump_files = sorted(224 os.listdir(os.path.join(self.dump_root_, "while")))225 self.assertEqual(num_iter, len(while_identity_dump_files))226 # Verify the content of the individual227 for k in xrange(len(while_identity_dump_files)):228 dump_file_path = os.path.join(self.dump_root_, "while",229 while_identity_dump_files[k])230 self._verifyTensorDumpFile(dump_file_path, "while/Identity:0",231 "DebugIdentity", 0, np.array(k))232if __name__ == "__main__":...
test_postgresql.py
Source:test_postgresql.py
...18 self.connector.settings['HOST'] = 'hostname'19 def test_user_password_uses_special_characters(self, mock_dump_cmd):20 self.connector.settings['PASSWORD'] = '@!'21 self.connector.settings['USER'] = '@'22 self.connector.create_dump()23 self.assertIn('postgresql://%40:%40%21@hostname/dbname', mock_dump_cmd.call_args[0][0])24 def test_create_dump(self, mock_dump_cmd):25 dump = self.connector.create_dump()26 # Test dump27 dump_content = dump.read()28 self.assertTrue(dump_content)29 self.assertEqual(dump_content, b'foo')30 # Test cmd31 self.assertTrue(mock_dump_cmd.called)32 def test_create_dump_without_host_raises_error(self, mock_dump_cmd):33 self.connector.settings.pop('HOST', None)34 with self.assertRaises(DumpError):35 self.connector.create_dump()36 def test_password_but_no_user(self, mock_dump_cmd):37 self.connector.settings.pop('USER', None)38 self.connector.settings['PASSWORD'] = 'hello'39 self.connector.create_dump()40 self.assertIn('postgresql://hostname/dbname', mock_dump_cmd.call_args[0][0])41 def test_create_dump_host(self, mock_dump_cmd):42 # With43 self.connector.settings['HOST'] = 'foo'44 self.connector.create_dump()45 self.assertIn('postgresql://foo/dbname', mock_dump_cmd.call_args[0][0])46 def test_create_dump_port(self, mock_dump_cmd):47 # Without48 self.connector.settings.pop('PORT', None)49 self.connector.create_dump()50 self.assertIn('postgresql://hostname/dbname', mock_dump_cmd.call_args[0][0])51 # With52 self.connector.settings['PORT'] = 4253 self.connector.create_dump()54 self.assertIn('postgresql://hostname:42/dbname', mock_dump_cmd.call_args[0][0])55 def test_create_dump_user(self, mock_dump_cmd):56 # Without57 self.connector.settings.pop('USER', None)58 self.connector.create_dump()59 self.assertIn('postgresql://hostname/dbname', mock_dump_cmd.call_args[0][0])60 # With61 self.connector.settings['USER'] = 'foo'62 self.connector.create_dump()63 self.assertIn('postgresql://foo@hostname/dbname', mock_dump_cmd.call_args[0][0])64 def test_create_dump_exclude(self, mock_dump_cmd):65 # Without66 self.connector.create_dump()67 self.assertNotIn(' --exclude-table-data=', mock_dump_cmd.call_args[0][0])68 # With69 self.connector.exclude = ('foo',)70 self.connector.create_dump()71 self.assertIn(' --exclude-table-data=foo', mock_dump_cmd.call_args[0][0])72 # With serveral73 self.connector.exclude = ('foo', 'bar')74 self.connector.create_dump()75 self.assertIn(' --exclude-table-data=foo', mock_dump_cmd.call_args[0][0])76 self.assertIn(' --exclude-table-data=bar', mock_dump_cmd.call_args[0][0])77 def test_create_dump_drop(self, mock_dump_cmd):78 # Without79 self.connector.drop = False80 self.connector.create_dump()81 self.assertNotIn(' --clean', mock_dump_cmd.call_args[0][0])82 # With83 self.connector.drop = True84 self.connector.create_dump()85 self.assertIn(' --clean', mock_dump_cmd.call_args[0][0])86 @patch('dbbackup.db.postgresql.PgDumpConnector.run_command',87 return_value=(BytesIO(), BytesIO()))88 def test_restore_dump(self, mock_dump_cmd, mock_restore_cmd):89 dump = self.connector.create_dump()90 self.connector.restore_dump(dump)91 # Test cmd92 self.assertTrue(mock_restore_cmd.called)93 @patch('dbbackup.db.postgresql.PgDumpConnector.run_command',94 return_value=(BytesIO(), BytesIO()))95 def test_restore_dump_user(self, mock_dump_cmd, mock_restore_cmd):96 dump = self.connector.create_dump()97 # Without98 self.connector.settings.pop('USER', None)99 self.connector.restore_dump(dump)100 self.assertIn(101 'postgresql://hostname/dbname',102 mock_restore_cmd.call_args[0][0]103 )104 self.assertNotIn(' --username=', mock_restore_cmd.call_args[0][0])105 # With106 self.connector.settings['USER'] = 'foo'107 self.connector.restore_dump(dump)108 self.assertIn(109 'postgresql://foo@hostname/dbname',110 mock_restore_cmd.call_args[0][0]111 )112@patch('dbbackup.db.postgresql.PgDumpBinaryConnector.run_command',113 return_value=(BytesIO(b'foo'), BytesIO()))114class PgDumpBinaryConnectorTest(TestCase):115 def setUp(self):116 self.connector = PgDumpBinaryConnector()117 self.connector.settings['HOST'] = 'hostname'118 self.connector.settings['ENGINE'] = 'django.db.backends.postgresql'119 self.connector.settings['NAME'] = 'dbname'120 def test_create_dump(self, mock_dump_cmd):121 dump = self.connector.create_dump()122 # Test dump123 dump_content = dump.read()124 self.assertTrue(dump_content)125 self.assertEqual(dump_content, b'foo')126 # Test cmd127 self.assertTrue(mock_dump_cmd.called)128 self.assertIn('--format=custom', mock_dump_cmd.call_args[0][0])129 def test_create_dump_exclude(self, mock_dump_cmd):130 # Without131 self.connector.create_dump()132 self.assertNotIn(' --exclude-table-data=', mock_dump_cmd.call_args[0][0])133 # With134 self.connector.exclude = ('foo',)135 self.connector.create_dump()136 self.assertIn(' --exclude-table-data=foo', mock_dump_cmd.call_args[0][0])137 # With serveral138 self.connector.exclude = ('foo', 'bar')139 self.connector.create_dump()140 self.assertIn(' --exclude-table-data=foo', mock_dump_cmd.call_args[0][0])141 self.assertIn(' --exclude-table-data=bar', mock_dump_cmd.call_args[0][0])142 def test_create_dump_drop(self, mock_dump_cmd):143 # Without144 self.connector.drop = False145 self.connector.create_dump()146 self.assertNotIn(' --clean', mock_dump_cmd.call_args[0][0])147 # Binary drop at restore level148 self.connector.drop = True149 self.connector.create_dump()150 self.assertNotIn(' --clean', mock_dump_cmd.call_args[0][0])151 @patch('dbbackup.db.postgresql.PgDumpBinaryConnector.run_command',152 return_value=(BytesIO(), BytesIO()))153 def test_restore_dump(self, mock_dump_cmd, mock_restore_cmd):154 dump = self.connector.create_dump()155 self.connector.restore_dump(dump)156 # Test cmd157 self.assertTrue(mock_restore_cmd.called)158@patch('dbbackup.db.postgresql.PgDumpGisConnector.run_command',159 return_value=(BytesIO(b'foo'), BytesIO()))160class PgDumpGisConnectorTest(TestCase):161 def setUp(self):162 self.connector = PgDumpGisConnector()163 self.connector.settings['HOST'] = 'hostname'164 @patch('dbbackup.db.postgresql.PgDumpGisConnector.run_command',165 return_value=(BytesIO(b'foo'), BytesIO()))166 def test_restore_dump(self, mock_dump_cmd, mock_restore_cmd):167 dump = self.connector.create_dump()168 # Without ADMINUSER169 self.connector.settings.pop('ADMIN_USER', None)170 self.connector.restore_dump(dump)171 self.assertTrue(mock_restore_cmd.called)172 # With173 self.connector.settings['ADMIN_USER'] = 'foo'174 self.connector.restore_dump(dump)175 self.assertTrue(mock_restore_cmd.called)176 def test_enable_postgis(self, mock_dump_cmd):177 self.connector.settings['ADMIN_USER'] = 'foo'178 self.connector._enable_postgis()179 self.assertIn('"CREATE EXTENSION IF NOT EXISTS postgis;"', mock_dump_cmd.call_args[0][0])180 self.assertIn('--username=foo', mock_dump_cmd.call_args[0][0])181 def test_enable_postgis_host(self, mock_dump_cmd):182 self.connector.settings['ADMIN_USER'] = 'foo'183 # Without184 self.connector.settings.pop('HOST', None)185 self.connector._enable_postgis()186 self.assertNotIn(' --host=', mock_dump_cmd.call_args[0][0])187 # With188 self.connector.settings['HOST'] = 'foo'189 self.connector._enable_postgis()190 self.assertIn(' --host=foo', mock_dump_cmd.call_args[0][0])191 def test_enable_postgis_port(self, mock_dump_cmd):192 self.connector.settings['ADMIN_USER'] = 'foo'193 # Without194 self.connector.settings.pop('PORT', None)195 self.connector._enable_postgis()196 self.assertNotIn(' --port=', mock_dump_cmd.call_args[0][0])197 # With198 self.connector.settings['PORT'] = 42199 self.connector._enable_postgis()200 self.assertIn(' --port=42', mock_dump_cmd.call_args[0][0])201@patch('dbbackup.db.base.Popen', **{202 'return_value.wait.return_value': True,203 'return_value.poll.return_value': False,204})205class PgDumpConnectorRunCommandTest(TestCase):206 def test_run_command(self, mock_popen):207 connector = PgDumpConnector()208 connector.settings['HOST'] = 'hostname'209 connector.create_dump()210 self.assertEqual(mock_popen.call_args[0][0][0], 'pg_dump')211 def test_run_command_with_password(self, mock_popen):212 connector = PgDumpConnector()213 connector.settings['HOST'] = 'hostname'214 connector.settings['PASSWORD'] = 'foo'215 connector.create_dump()216 self.assertEqual(mock_popen.call_args[0][0][0], 'pg_dump')217 def test_run_command_with_password_and_other(self, mock_popen):218 connector = PgDumpConnector(env={'foo': 'bar'})219 connector.settings['HOST'] = 'hostname'220 connector.settings['PASSWORD'] = 'foo'221 connector.create_dump()222 self.assertEqual(mock_popen.call_args[0][0][0], 'pg_dump')223 self.assertIn('foo', mock_popen.call_args[1]['env'])...
elf-dump
Source:elf-dump
...49 self.sh_link = f.read32()50 self.sh_info = f.read32()51 self.sh_addralign = f.readWord()52 self.sh_entsize = f.readWord()53 def dump(self, shstrtab, f, strtab, dumpdata):54 print " (('sh_name', %s)" % common_dump.HexDump(self.sh_name), "# %r" % shstrtab[self.sh_name[0]]55 print " ('sh_type', %s)" % common_dump.HexDump(self.sh_type)56 print " ('sh_flags', %s)" % common_dump.HexDump(self.sh_flags)57 print " ('sh_addr', %s)" % common_dump.HexDump(self.sh_addr)58 print " ('sh_offset', %s)" % common_dump.HexDump(self.sh_offset)59 print " ('sh_size', %s)" % common_dump.HexDump(self.sh_size)60 print " ('sh_link', %s)" % common_dump.HexDump(self.sh_link)61 print " ('sh_info', %s)" % common_dump.HexDump(self.sh_info)62 print " ('sh_addralign', %s)" % common_dump.HexDump(self.sh_addralign)63 print " ('sh_entsize', %s)" % common_dump.HexDump(self.sh_entsize)64 if self.sh_type[0] == 2: # SHT_SYMTAB65 print " ('_symbols', ["66 dumpSymtab(f, self, strtab)67 print " ])"68 elif self.sh_type[0] == 4 or self.sh_type[0] == 9: # SHT_RELA / SHT_REL69 print " ('_relocations', ["70 dumpRel(f, self, self.sh_type[0] == 4)71 print " ])"72 elif dumpdata:73 f.seek(self.sh_offset[0])74 if self.sh_type != 8: # != SHT_NOBITS75 data = f.read(self.sh_size[0])76 print " ('_section_data', '%s')" % common_dump.dataToHex(data)77 else:78 print " ('_section_data', '')" 79 print " ),"80def dumpSymtab(f, section, strtab):81 entries = section.sh_size[0] // section.sh_entsize[0]82 for index in range(entries):83 f.seek(section.sh_offset[0] + index * section.sh_entsize[0])84 print " # Symbol %s" % index85 name = f.read32()86 print " (('st_name', %s)" % common_dump.HexDump(name), "# %r" % strtab[name[0]]87 if not f.is64Bit:88 print " ('st_value', %s)" % common_dump.HexDump(f.read32())89 print " ('st_size', %s)" % common_dump.HexDump(f.read32())90 st_info = f.read8()[0]91 st_bind = (st_info >> 4, 4)92 st_type = (st_info & 0xf, 4)93 print " ('st_bind', %s)" % common_dump.HexDump(st_bind)94 print " ('st_type', %s)" % common_dump.HexDump(st_type)95 print " ('st_other', %s)" % common_dump.HexDump(f.read8())96 print " ('st_shndx', %s)" % common_dump.HexDump(f.read16())97 if f.is64Bit:98 print " ('st_value', %s)" % common_dump.HexDump(f.read64())99 print " ('st_size', %s)" % common_dump.HexDump(f.read64())100 print " ),"101def dumpRel(f, section, dumprela = False):102 entries = section.sh_size[0] // section.sh_entsize[0]103 for index in range(entries):104 f.seek(section.sh_offset[0] + index * section.sh_entsize[0])105 print " # Relocation %s" % index106 print " (('r_offset', %s)" % common_dump.HexDump(f.readWord())107 r_info = f.readWord()[0]108 if f.is64Bit:109 r_sym = (r_info >> 32, 32)110 r_type = (r_info & 0xffffffff, 32)111 else:112 r_sym = (r_info >> 8, 24)113 r_type = (r_info & 0xff, 8)114 print " ('r_sym', %s)" % common_dump.HexDump(r_sym)115 print " ('r_type', %s)" % common_dump.HexDump(r_type)116 if dumprela:117 print " ('r_addend', %s)" % common_dump.HexDump(f.readWord())118 print " ),"119def dumpELF(path, opts):120 f = Reader(path)121 magic = f.read(4)122 assert magic == '\x7FELF'123 fileclass = f.read8()124 if fileclass[0] == 1: # ELFCLASS32125 f.is64Bit = False126 elif fileclass[0] == 2: # ELFCLASS64127 f.is64Bit = True128 else:129 raise ValueError, "Unknown file class %s" % common_dump.HexDump(fileclass)130 print "('e_indent[EI_CLASS]', %s)" % common_dump.HexDump(fileclass)131 byteordering = f.read8()132 if byteordering[0] == 1: # ELFDATA2LSB133 f.isLSB = True134 elif byteordering[0] == 2: # ELFDATA2MSB135 f.isLSB = False136 else:137 raise ValueError, "Unknown byte ordering %s" % common_dump.HexDump(byteordering)138 print "('e_indent[EI_DATA]', %s)" % common_dump.HexDump(byteordering)139 print "('e_indent[EI_VERSION]', %s)" % common_dump.HexDump(f.read8())140 print "('e_indent[EI_OSABI]', %s)" % common_dump.HexDump(f.read8())141 print "('e_indent[EI_ABIVERSION]', %s)" % common_dump.HexDump(f.read8())142 f.seek(16) # Seek to end of e_ident.143 print "('e_type', %s)" % common_dump.HexDump(f.read16())144 print "('e_machine', %s)" % common_dump.HexDump(f.read16())145 print "('e_version', %s)" % common_dump.HexDump(f.read32())146 print "('e_entry', %s)" % common_dump.HexDump(f.readWord())147 print "('e_phoff', %s)" % common_dump.HexDump(f.readWord())148 e_shoff = f.readWord()149 print "('e_shoff', %s)" % common_dump.HexDump(e_shoff)150 print "('e_flags', %s)" % common_dump.HexDump(f.read32())151 print "('e_ehsize', %s)" % common_dump.HexDump(f.read16())152 print "('e_phentsize', %s)" % common_dump.HexDump(f.read16())153 print "('e_phnum', %s)" % common_dump.HexDump(f.read16())154 e_shentsize = f.read16()155 print "('e_shentsize', %s)" % common_dump.HexDump(e_shentsize)156 e_shnum = f.read16()157 print "('e_shnum', %s)" % common_dump.HexDump(e_shnum)158 e_shstrndx = f.read16()159 print "('e_shstrndx', %s)" % common_dump.HexDump(e_shstrndx)160 # Read all section headers161 sections = []162 for index in range(e_shnum[0]):163 f.seek(e_shoff[0] + index * e_shentsize[0])164 s = Section(f)165 sections.append(s)166 # Read .shstrtab so we can resolve section names167 f.seek(sections[e_shstrndx[0]].sh_offset[0])168 shstrtab = StringTable(f.read(sections[e_shstrndx[0]].sh_size[0]))169 # Get the symbol string table170 strtab = None171 for section in sections:172 if shstrtab[section.sh_name[0]] == ".strtab":173 f.seek(section.sh_offset[0])174 strtab = StringTable(f.read(section.sh_size[0]))175 break176 print "('_sections', ["177 for index in range(e_shnum[0]):178 print " # Section %s" % index179 sections[index].dump(shstrtab, f, strtab, opts.dumpSectionData)180 print "])"181if __name__ == "__main__":182 from optparse import OptionParser, OptionGroup183 parser = OptionParser("usage: %prog [options] {files}")184 parser.add_option("", "--dump-section-data", dest="dumpSectionData",185 help="Dump the contents of sections",186 action="store_true", default=False)187 (opts, args) = parser.parse_args()188 if not args:189 args.append('-')190 for arg in args:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!