Best Python code snippet using localstack_python
parameter_server_optimizer.py
Source:parameter_server_optimizer.py
1# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13from paddle import fluid14from .meta_optimizer_base import MetaOptimizerBase15from paddle.fluid import core16import subprocess17import re18import os19import platform20from ..base.private_helper_function import wait_server_ready21class ParameterServerOptimizer(MetaOptimizerBase):22 def __init__(self, optimizer):23 super(ParameterServerOptimizer, self).__init__(optimizer)24 self.inner_opt = optimizer25 # we do not allow meta optimizer to be inner optimizer currently26 self.meta_optimizers_white_list = []27 def _is_graph_out(self):28 return False29 def _can_apply(self):30 if self.role_maker._is_collective:31 return False32 k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]33 return True if k_steps >= 0 else False34 def _get_distributed_strategy(self):35 from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory36 k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]37 strategy = None38 if not self.user_defined_strategy.a_sync and k_steps == 0:39 strategy = StrategyFactory.create_sync_strategy()40 if self.user_defined_strategy.a_sync and k_steps == 0:41 strategy = StrategyFactory.create_async_strategy()42 if self.user_defined_strategy.a_sync and k_steps > 0:43 strategy = StrategyFactory.create_geo_strategy(k_steps)44 if not strategy:45 raise ValueError("k_steps must be invalid value, please check")46 return strategy47 def _build_trainer_programs(self, compiled_config):48 from paddle.fluid.incubate.fleet.parameter_server.ir import trainer_pass as worker49 _main = compiled_config.origin_main_program.clone()50 _startup = compiled_config.origin_startup_program.clone()51 if not compiled_config.is_geo_mode():52 # for main program53 _main = worker.delete_optimizer_pass(_main, compiled_config)54 _main = worker.distributed_ops_pass(_main, compiled_config)55 _main = worker.append_send_ops_pass(_main, compiled_config)56 # for startup program57 _startup = worker.fake_init_ops_pass(_startup, compiled_config)58 _startup = worker.init_from_server_pass(_startup, compiled_config)59 _startup = worker.delet_extra_optimizes_pass(_startup,60 compiled_config)61 compiled_config.set_origin_ps_main_program(_main)62 compiled_config.set_origin_ps_startup_program(_startup)63 # for heter program64 if self.role_maker._is_heter_parameter_server_mode:65 from paddle.fluid.incubate.fleet.parameter_server.ir import heter_trainer_pass as heter_worker66 if self.role_maker._is_heter_worker():67 # for heter worker68 _main = heter_worker.split_heter_worker_ops_pass(69 _main, compiled_config)70 else:71 # for default worker72 _main = heter_worker.split_trainer_ops_pass(_main,73 compiled_config)74 # for startup change75 _startup = heter_worker.delete_startup_useless_ops_var_pass(76 _startup, _main, compiled_config)77 else:78 _main = worker.append_send_ops_pass(_main, compiled_config)79 _startup = _startup80 compiled_config.set_origin_ps_main_program(_main)81 compiled_config.set_origin_ps_startup_program(_startup)82 launch_barrier = self.user_defined_strategy.a_sync_configs[83 "launch_barrier"]84 launch_barrier_flag = int(os.getenv("FLAGS_LAUNCH_BARRIER", "1"))85 if launch_barrier and launch_barrier_flag:86 # for trainer wait server ready87 wait_server_ready(self.role_maker._get_pserver_endpoints())88 # for ps-heter mode, wait heter worker ready89 if self.role_maker._is_heter_parameter_server_mode and self.role_maker._is_worker(90 ):91 wait_server_ready(self.role_maker._get_heter_worker_endpoints())92 return _main, _startup93 def _build_pserver_programs(self, compiled_config):94 from paddle.fluid.incubate.fleet.parameter_server.ir import pserver_pass as server95 _main = fluid.Program()96 _startup = fluid.Program()97 if not compiled_config.is_geo_mode():98 _main = server.add_listen_and_serv_pass(_main, compiled_config)99 _main = server.add_rpc_global_flags_pass(_main, compiled_config)100 _main = server.add_optimizer_pass(_main, compiled_config)101 _main = server.large_scale_sparse_pass(_main, _main,102 compiled_config, False)103 _startup = server.build_pserver_startup_program_pass(104 _startup, _main, compiled_config)105 _startup = server.large_scale_sparse_pass(_startup, _main,106 compiled_config, True)107 if not compiled_config.is_sync_mode():108 _main = server.delete_unused_in_main_pass(_main,109 compiled_config)110 _startup = server.delete_unused_in_startup_pass(_startup, _main,111 compiled_config)112 else:113 _main = server.add_listen_and_serv_pass(_main, compiled_config)114 _main = server.add_rpc_global_flags_pass(_main, compiled_config)115 _main = server.add_geo_optimizer_pass(_main, compiled_config)116 _main = server.large_scale_sparse_pass(_main, _main,117 compiled_config, False)118 _startup = server.build_pserver_startup_program_pass(119 _startup, _main, compiled_config)120 _startup = server.large_scale_sparse_pass(_startup, _main,121 compiled_config, True)122 _startup = server.delete_unused_in_startup_pass(_startup, _main,123 compiled_config)124 return _main, _startup125 def _can_apply_geo(self, dist_strategy, program):126 def get_sys_free_mem():127 plat = platform.system()128 if platform.system() == "Darwin":129 vm = subprocess.Popen(130 ['vm_stat'], stdout=subprocess.PIPE).communicate()[0]131 # Process vm_stat132 vmLines = vm.split('\n')133 sep = re.compile(r':[\s]+')134 vmStats = {}135 for row in range(1, len(vmLines) - 2):136 rowText = vmLines[row].strip()137 rowElements = sep.split(rowText)138 vmStats[(rowElements[0]139 )] = int(rowElements[1].strip(r'\.')) * 4096140 return vmStats["Pages free"]141 elif platform.system() == "Linux":142 mems = {}143 with open('/proc/meminfo', 'rb') as f:144 for line in f:145 fields = line.split()146 mems[fields[0]] = int(fields[1]) * 1024147 free = mems[b'MemFree:']148 return free149 else:150 raise ValueError(151 "%s platform is unsupported is parameter server optimizer" %152 (platform.system()))153 if not isinstance(self.inner_opt, fluid.optimizer.SGDOptimizer):154 return False155 free = get_sys_free_mem()156 from paddle.fluid.incubate.fleet.parameter_server.ir import vars_metatools157 processed_var_names = set(["@EMPTY@"])158 param_memory_size = 0159 for varname in program.global_block().vars:160 var = program.global_block().vars[varname]161 if not var.persistable or var.desc.type(162 ) != core.VarDesc.VarType.LOD_TENSOR:163 continue164 param = vars_metatools.create_var_struct(var)165 param_memory_size += param.m_size166 processed_var_names.add(varname)167 upper_mem_use = param_memory_size * 5.0168 program_tmp_vars = dict()169 eval_batch_size = 1024170 for op in program.global_block().ops:171 for var_name in op.output_arg_names:172 if var_name in processed_var_names:173 continue174 processed_var_names.add(var_name)175 var = program.global_block().vars[var_name]176 if var.desc.type() != core.VarDesc.VarType.LOD_TENSOR:177 continue178 data_count = 1179 neg_dim_count = 0180 for x in var.shape:181 if x < 0:182 if neg_dim_count >= 1:183 raise ValueError(184 "Var %s has more than one negative dim." %185 (var_name))186 neg_dim_count += 1187 data_count *= (-x)188 else:189 data_count *= x190 program_tmp_vars[var_name] = (191 data_count, neg_dim_count,192 vars_metatools.dtype_to_size[var.dtype])193 for varname in program_tmp_vars:194 data_count, neg_dim_count, type_size = program_tmp_vars[varname]195 if neg_dim_count == 1:196 data_count *= eval_batch_size197 var_memory = data_count * type_size198 upper_mem_use += var_memory199 if upper_mem_use < free:200 return True201 else:202 return False203 def minimize_impl(self,204 loss,205 startup_program=None,206 parameter_list=None,207 no_grad_set=None):208 self.inner_opt.minimize(loss, startup_program, parameter_list,209 no_grad_set)210 strategy = self._get_distributed_strategy()211 _origin_main_program = loss.block.program212 _origin_startup_program = startup_program213 from paddle.fluid.incubate.fleet.parameter_server.ir import public as public214 compiled_config = public.CompileTimeStrategy(_origin_main_program,215 _origin_startup_program,216 strategy, self.role_maker)217 compiled_config.strategy = strategy218 if self.role_maker._is_worker() or self.role_maker._is_heter_worker():219 main_program, startup_program = self._build_trainer_programs(220 compiled_config)221 elif self.role_maker._is_server():222 main_program, startup_program = self._build_pserver_programs(223 compiled_config)224 loss.block.program = main_program225 fluid.framework.switch_startup_program(startup_program)226 return None, None227 def _disable_strategy(self, dist_strategy):228 dist_strategy.a_sync = False229 a_sync_configs = dist_strategy.a_sync_configs230 a_sync_configs["k_steps"] = -1231 dist_strategy.a_sync_configs = a_sync_configs232 def _enable_strategy(self, dist_strategy, context):233 a_sync_configs = dist_strategy.a_sync_configs234 if a_sync_configs["k_steps"] >= 0:235 return236 dist_strategy.a_sync = True237 a_sync_configs = dist_strategy.a_sync_configs238 is_geo = self._can_apply_geo(dist_strategy,239 context["origin_main_program"])240 if is_geo:241 a_sync_configs["k_steps"] = 800242 else:243 a_sync_configs["k_steps"] = 0...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!