Best Python code snippet using ATX
test_training.py
Source:test_training.py
1from tempfile import NamedTemporaryFile2import numpy3from numpy.testing import assert_allclose4import theano5from fuel.datasets import IterableDataset6from theano import tensor7from blocks.algorithms import GradientDescent, Scale8from blocks.config import config9from blocks.extensions import FinishAfter, TrainingExtension10from blocks.extensions.saveload import Checkpoint11from blocks.extensions.training import SharedVariableModifier, TrackTheBest12from blocks.extensions.predicates import OnLogRecord13from blocks.main_loop import MainLoop14from blocks.serialization import load15from blocks.utils import shared_floatx16from blocks.utils.testing import MockMainLoop, skip_if_configuration_set17def test_shared_variable_modifier():18 weights = numpy.array([-1, 1], dtype=theano.config.floatX)19 features = [numpy.array(f, dtype=theano.config.floatX)20 for f in [[1, 2], [3, 4], [5, 6]]]21 targets = [(weights * f).sum() for f in features]22 n_batches = 323 dataset = IterableDataset(dict(features=features, targets=targets))24 x = tensor.vector('features')25 y = tensor.scalar('targets')26 W = shared_floatx([0, 0], name='W')27 cost = ((x * W).sum() - y) ** 228 cost.name = 'cost'29 step_rule = Scale(0.001)30 sgd = GradientDescent(cost=cost, parameters=[W],31 step_rule=step_rule)32 main_loop = MainLoop(33 model=None, data_stream=dataset.get_example_stream(),34 algorithm=sgd,35 extensions=[36 FinishAfter(after_n_epochs=1),37 SharedVariableModifier(38 step_rule.learning_rate,39 lambda n: numpy.cast[theano.config.floatX](10. / n)40 )])41 main_loop.run()42 assert_allclose(step_rule.learning_rate.get_value(),43 numpy.cast[theano.config.floatX](10. / n_batches))44def test_shared_variable_modifier_two_parameters():45 weights = numpy.array([-1, 1], dtype=theano.config.floatX)46 features = [numpy.array(f, dtype=theano.config.floatX)47 for f in [[1, 2], [3, 4], [5, 6]]]48 targets = [(weights * f).sum() for f in features]49 n_batches = 350 dataset = IterableDataset(dict(features=features, targets=targets))51 x = tensor.vector('features')52 y = tensor.scalar('targets')53 W = shared_floatx([0, 0], name='W')54 cost = ((x * W).sum() - y) ** 255 cost.name = 'cost'56 step_rule = Scale(0.001)57 sgd = GradientDescent(cost=cost, parameters=[W],58 step_rule=step_rule)59 modifier = SharedVariableModifier(60 step_rule.learning_rate,61 lambda _, val: numpy.cast[theano.config.floatX](val * 0.2))62 main_loop = MainLoop(63 model=None, data_stream=dataset.get_example_stream(),64 algorithm=sgd,65 extensions=[FinishAfter(after_n_epochs=1), modifier])66 main_loop.run()67 new_value = step_rule.learning_rate.get_value()68 assert_allclose(new_value,69 0.001 * 0.2 ** n_batches,70 atol=1e-5)71def test_track_the_best():72 main_loop = MockMainLoop()73 extension = TrackTheBest("cost")74 extension.main_loop = main_loop75 main_loop.status['epochs_done'] += 176 main_loop.status['iterations_done'] += 1077 main_loop.log.current_row['cost'] = 578 extension.dispatch('after_epoch')79 assert main_loop.status['best_cost'] == 580 assert main_loop.log.current_row['cost_best_so_far']81 main_loop.status['epochs_done'] += 182 main_loop.status['iterations_done'] += 1083 main_loop.log.current_row['cost'] = 684 extension.dispatch('after_epoch')85 assert main_loop.status['best_cost'] == 586 assert main_loop.log.current_row.get('cost_best_so_far', None) is None87 main_loop.status['epochs_done'] += 188 main_loop.status['iterations_done'] += 1089 main_loop.log.current_row['cost'] = 590 extension.dispatch('after_epoch')91 assert main_loop.status['best_cost'] == 592 assert main_loop.log.current_row.get('cost_best_so_far', None) is None93 main_loop.status['epochs_done'] += 194 main_loop.status['iterations_done'] += 1095 main_loop.log.current_row['cost'] = 496 extension.dispatch('after_epoch')97 assert main_loop.status['best_cost'] == 498 assert main_loop.log.current_row['cost_best_so_far']99class WriteCostExtension(TrainingExtension):100 def after_batch(self, batch):101 self.main_loop.log.current_row['cost'] = abs(102 self.main_loop.log.status['iterations_done'] - 5) + 3103def test_save_the_best():104 skip_if_configuration_set('log_backend', 'sqlite',105 "Known to be flaky with SQLite log backend.")106 with NamedTemporaryFile(dir=config.temp_dir) as dst,\107 NamedTemporaryFile(dir=config.temp_dir) as dst_best:108 track_cost = TrackTheBest("cost", after_epoch=False, after_batch=True)109 main_loop = MockMainLoop(110 extensions=[FinishAfter(after_n_epochs=1),111 WriteCostExtension(),112 track_cost,113 Checkpoint(dst.name, after_batch=True,114 save_separately=['log'])115 .add_condition(116 ["after_batch"],117 OnLogRecord(track_cost.notification_name),118 (dst_best.name,))])119 main_loop.run()120 assert main_loop.log[4]['saved_to'] == (dst.name, dst_best.name)121 assert main_loop.log[5]['saved_to'] == (dst.name, dst_best.name)122 assert main_loop.log[6]['saved_to'] == (dst.name,)123 with open(dst_best.name, 'rb') as src:...
connection.py
Source:connection.py
1# SPDX-License-Identifier: BSD-3-Clause2# Handle NM.RemoteConnection3from __future__ import absolute_import, division, print_function4__metaclass__ = type5import logging6# Relative import is not support by ansible 2.8 yet7# pylint: disable=import-error, no-name-in-module8from ansible_collections.fedora.linux_system_roles.plugins.module_utils.network_lsr.nm import client # noqa:E5019from ansible_collections.fedora.linux_system_roles.plugins.module_utils.network_lsr.nm import error # noqa:E50110# pylint: enable=import-error, no-name-in-module11def delete_remote_connection(nm_profile, timeout, check_mode):12 if not nm_profile:13 logging.info("NULL NM.RemoteConnection, no need to delete")14 return False15 if not check_mode:16 main_loop = client.get_mainloop(timeout)17 user_data = main_loop18 nm_profile.delete_async(19 main_loop.cancellable,20 _nm_profile_delete_call_back,21 user_data,22 )23 logging.debug(24 "Deleting profile %s/%s with timeout %s",25 nm_profile.get_id(),26 nm_profile.get_uuid(),27 timeout,28 )29 main_loop.run()30 return True31def _nm_profile_delete_call_back(nm_profile, result, user_data):32 main_loop = user_data33 if main_loop.is_cancelled:34 return35 try:36 success = nm_profile.delete_finish(result)37 except Exception as e:38 main_loop.fail(39 error.LsrNetworkNmError(40 "Connection deletion aborted on {id}/{uuid}: error={error}".format(41 id=nm_profile.get_id(), uuid=nm_profile.get_uuid(), error=e42 )43 )44 )45 if success:46 main_loop.quit()47 else:48 main_loop.fail(49 error.LsrNetworkNmError(50 "Connection deletion aborted on {id}/{uuid}: error=unknown".format(51 id=nm_profile.get_id(), uuid=nm_profile.get_uuid()52 )53 )54 )55def volatilize_remote_connection(nm_profile, timeout, check_mode):56 if not nm_profile:57 logging.info("NULL NM.RemoteConnection, no need to volatilize")58 return False59 if not check_mode:60 main_loop = client.get_mainloop(timeout)61 user_data = main_loop62 nm_profile.update2(63 None, # settings64 client.NM.SettingsUpdate2Flags.IN_MEMORY_ONLY65 | client.NM.SettingsUpdate2Flags.VOLATILE,66 None, # args67 main_loop.cancellable,68 _nm_profile_volatile_update2_call_back,69 user_data,70 )71 logging.debug(72 "Volatilizing profile %s/%s with timeout %s",73 nm_profile.get_id(),74 nm_profile.get_uuid(),75 timeout,76 )77 main_loop.run()78 return True79def _nm_profile_volatile_update2_call_back(nm_profile, result, user_data):80 main_loop = user_data81 if main_loop.is_cancelled:82 return83 try:84 success = nm_profile.update2_finish(result)85 except Exception as e:86 main_loop.fail(87 error.LsrNetworkNmError(88 "Connection volatilize aborted on {id}/{uuid}: error={error}".format(89 id=nm_profile.get_id(), uuid=nm_profile.get_uuid(), error=e90 )91 )92 )93 if success:94 main_loop.quit()95 else:96 main_loop.fail(97 error.LsrNetworkNmError(98 "Connection volatilize aborted on {id}/{uuid}: error=unknown".format(99 id=nm_profile.get_id(), uuid=nm_profile.get_uuid()100 )101 )...
test_main_loop.py
Source:test_main_loop.py
...16 """Writes data saved by MockAlgorithm to the log."""17 def after_batch(self, _):18 self.main_loop.log.current_row['batch'] = \19 self.main_loop.algorithm.batch20def test_main_loop():21 old_config_profile_value = config.profile22 config.profile = True23 main_loop = MainLoop(24 MockAlgorithm(), IterableDataset(range(10)).get_example_stream(),25 extensions=[WriteBatchExtension(), FinishAfter(after_n_epochs=2)])26 main_loop.run()27 assert_raises(AttributeError, getattr, main_loop, 'model')28 assert main_loop.log.status['iterations_done'] == 2029 assert main_loop.log.status['_epoch_ends'] == [10, 20]30 assert len(main_loop.log) == 2031 for i in range(20):32 assert main_loop.log[i + 1]['batch'] == {'data': i % 10}33 config.profile = old_config_profile_value34def test_training_resumption():...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!