How to use env method in fMBT

Best Python code snippet using fMBT_python

DRYP_groundwater_EFD.py

Source:DRYP_groundwater_EFD.py Github

copy

Full Screen

1import os2import numpy as np3from landlab import RasterModelGrid4from landlab.grid.mappers import (5 map_mean_of_link_nodes_to_link,6 map_max_of_node_links_to_node,7 map_max_of_link_nodes_to_link,8 map_min_of_link_nodes_to_link)9from landlab.io import read_esri_ascii10#Global variables11REG_FACTOR = 0.001 #Regularisation factor12COURANT_2D = 0.25 # Courant Number 2D flow13COURANT_1D = 0.50 # Courant number 1D flow14STR_RIVER = 0.001 # Riverbed storage factor15# provisional16a_faq = 15017b_faq = 13118class gwflow_EFD(object):19 20 def __init__(self, env_state, data_in):21 22 env_state.SZgrid.add_zeros('node', 'recharge', dtype=float) 23 env_state.SZgrid.add_zeros('node', 'discharge', dtype=float) 24 env_state.SZgrid.add_zeros('node', 'river_stage__elevation', dtype=float) 25 env_state.SZgrid.add_zeros('node', 'water_storage_anomaly', dtype=float)26 27 env_state.SZgrid.at_node['topographic__elevation'] = np.array(env_state.grid.at_node['topographic__elevation']) 28 29 act_links = env_state.SZgrid.active_links30 31 Kmax = map_max_of_link_nodes_to_link(env_state.SZgrid, 'Hydraulic_Conductivity') 32 Kmin = map_min_of_link_nodes_to_link(env_state.SZgrid, 'Hydraulic_Conductivity') 33 Ksl = map_mean_of_link_nodes_to_link(env_state.SZgrid, 'Hydraulic_Conductivity')34 35 self.Ksat = np.zeros(len(Ksl)) 36 self.Ksat[act_links] = Kmax[act_links]*Kmin[act_links]/Ksl[act_links]37 38 self.hriv = np.array(env_state.SZgrid.at_node['water_table__elevation']) 39 self.wte_dt = np.array(env_state.SZgrid.at_node['water_table__elevation'])40 self.faq = np.ones_like(Ksl)41 self.faq_node = np.ones_like(self.hriv)42 43 if env_state.func == 1:44 print('Transmissivity function for variable WT depth')45 elif env_state.func == 2:46 print('Constant transmissivity')47 else:48 print('Unconfined conditions: variable thickness')49 print('Change approach in setting_file: line 26')50 51 52 if env_state.func == 1 or env_state.func == 2:53 dzdl = env_state.SZgrid.calc_grad_at_link(env_state.SZgrid.at_node['topographic__elevation'])54 a_faq = map_mean_of_link_nodes_to_link(env_state.SZgrid, 'SZ_a_aq')55 b_faq = map_mean_of_link_nodes_to_link(env_state.SZgrid, 'SZ_b_aq')56 self.faq = a_faq/(1+b_faq*np.abs(dzdl))57 self.faq_node = map_max_of_node_links_to_node(env_state.SZgrid, self.faq)58 self.zm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'topographic__elevation')59 self.flux_out = 060 self.act_fix_link = 061 62 A = np.power(env_state.SZgrid.dx,2) 63 Ariv = (env_state.grid.at_node['river_width']64 * env_state.grid.at_node['river_length'])65 self.W = Ariv / env_state.SZgrid.dx 66 kriv = Ariv67 kriv[Ariv != 0] = 1/Ariv[Ariv != 0]68 self.kriv = kriv69 self.kaq = 1/A70 self.kAriv = Ariv*self.kaq71 self.f = 3072 self.dh = np.zeros_like(Ariv)73 74 if len(env_state.SZgrid.open_boundary_nodes) > 0: 75 self.fixed_links = env_state.SZgrid.links_at_node[76 env_state.SZgrid.open_boundary_nodes]77 self.act_fix_link = 178 79 80 def add_second_layer_gw(self, env_state, thickness, Ksat, Sy, Ss): 81 # thickness: Thickness of the deep aquifer82 # Ksat: Saturated hydraulic conductivity of the deep aquifer83 # Sy: Specific yield of the second layer84 # Ss: Specific storage of the second layer 85 env_state.SZgrid.add_zeros('node', 'Ksat_2', dtype=float) 86 env_state.SZgrid.add_zeros('node', 'BOT_2', dtype=float) 87 env_state.SZgrid.add_zeros('node', 'Sy_2', dtype=float) 88 env_state.SZgrid.add_zeros('node', 'Ss_2', dtype=float)89 env_state.SZgrid.add_zeros('node', 'HEAD_2', dtype=float)90 91 env_state.SZgrid.at_node['BOT_2'] = np.array(92 env_state.SZgrid.at_node['BOT']93 + thickness94 )95 96 self.thickness = thickness97 98 env_state.SZgrid.at_node['Sy_2'][:] = Sy 99 env_state.SZgrid.at_node['Ss_2'][:] = Ss 100 env_state.SZgrid.at_node['Ksat_2'][:] = Ksat101 102 act_links = env_state.SZgrid.active_links 103 Kmax = map_max_of_link_nodes_to_link(env_state.SZgrid, 'Ksat_2') 104 Kmin = map_min_of_link_nodes_to_link(env_state.SZgrid, 'Ksat_2') 105 Ksl = map_mean_of_link_nodes_to_link(env_state.SZgrid, 'Ksat_2')106 107 self.Ksat_2 = np.zeros(len(Ksl)) 108 self.Ksat_2[act_links] = Kmax[act_links]*Kmin[act_links]/Ksl[act_links] 109 env_state.SZgrid.at_node['HEAD_2'][:] = np.array(env_state.SZgrid.at_node['water_table__elevation']) 110 def run_one_step_gw_2Layer(self, env_state, dt, tht_dt, rtht_dt, Droot): 111 112 """113 Function to update water table depending on the unsaturated zone.114 Parameters:115 Droot: Rooting depth [mm]116 tht_dt: Water content at time t [-]117 Duz: Unsaturated zone depth118 env_state: grid: z: Topograhic elevation119 h: water table120 fs: Saturated water content121 fc: Field capacity122 Sy: Specific yield123 dq: water storage anomaly124 125 Groundwater storage variation126 """127 dts = time_step(COURANT_2D, env_state.SZgrid.at_node['SZ_Sy'],128 env_state.SZgrid.at_node['Hydraulic_Conductivity'],129 env_state.SZgrid.at_node['water_table__elevation'],130 env_state.SZgrid.at_node['BOT'],131 env_state.SZgrid.dx,132 env_state.SZgrid.core_nodes)133 134 act_links = env_state.SZgrid.active_links135 A = np.power(env_state.SZgrid.dx,2)136 dtp = np.nanmin([dt, dts])137 dtsp = dtp138 env_state.SZgrid.at_node['discharge'][:] = 0.0139 140 while dtp <= dt:141 142 ## specifiying river flow boundary conditions for groundwater flow143 env_state.SZgrid.at_node['water_table__elevation'][:] = np.maximum(144 env_state.SZgrid.at_node['water_table__elevation'],145 env_state.SZgrid.at_node['BOT']146 )147 ## specifiying river flow boundary conditions for groundwater flow148 env_state.SZgrid.at_node['water_table__elevation'][:] = np.minimum(149 env_state.SZgrid.at_node['topographic__elevation'],150 env_state.SZgrid.at_node['water_table__elevation']151 ) 152 # Calculate vertical conductivity for river cells153 p = np.where((env_state.SZgrid.at_node['HEAD_2']154 -env_state.SZgrid.at_node['BOT_2']) > 0,1,0)155 156 dh_1 = (env_state.SZgrid.at_node['water_table__elevation']157 - env_state.SZgrid.at_node['BOT_2']158 )*p159 160 dh_2 = (env_state.SZgrid.at_node['HEAD_2']161 - env_state.SZgrid.at_node['BOT']162 )*(1-p)163 164 Cz = (np.where(dh_1 > 0,1/(env_state.SZgrid.at_node['Hydraulic_Conductivity']165 /(0.5*dh_1)),0) + 1/(env_state.SZgrid.at_node['Ksat_2']166 /(0.5*self.thickness)))167 168 aux_h = np.where((env_state.SZgrid.at_node['HEAD_2']169 -env_state.SZgrid.at_node['BOT_2']) > 0, env_state.SZgrid.at_node['water_table__elevation'],170 env_state.SZgrid.at_node['HEAD_2'])171 172 dhdz = np.where((dh_2+dh_1) > 0,173 (aux_h - env_state.SZgrid.at_node['HEAD_2'])/np.power(self.thickness+dh_1,2),0)174 175 dqz = - (1/Cz)*dhdz176 177 # Calculate the hydraulic gradients178 dhdl_1 = env_state.SZgrid.calc_grad_at_link(env_state.SZgrid.at_node['water_table__elevation'])179 dhdl_2 = env_state.SZgrid.calc_grad_at_link(env_state.SZgrid.at_node['HEAD_2'])180 181 # mean hydraulic head at the face of the of the patch182 qxy_1 = np.zeros(len(self.Ksat))183 qxy_2 = np.zeros(len(self.Ksat_2))184 185 hm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'water_table__elevation')186 bm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'BOT_2')187 188 # Calculate flux per unit length at each face layer 1189 qxy_1[act_links] = -self.Ksat[act_links]*(hm[act_links]-bm[act_links])*dhdl_1[act_links]190 191 # Calculate flux per unit length at each face layer 2192 hm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'HEAD_2')193 bm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'BOT')194 195 b2 = np.minimum(hm-bm, Self.thickness) 196 qxy_2[act_links] = -self.Ksat_2[act_links]*b2[act_links]*dhdl_2[act_links]197 198 # Calculate the flux gradient199 dqxy_1 = (-env_state.SZgrid.calc_flux_div_at_node(qxy_1) + dqz + p*env_state.SZgrid.at_node['recharge']/dt)#*dtsp200 dqxy_2 = (-env_state.SZgrid.calc_flux_div_at_node(qxy_2) - dqz + (1-p)*env_state.SZgrid.at_node['recharge']/dt)#*dtsp201 202 CRIV = smoth_func(env_state.SZgrid.at_node['water_table__elevation'],203 env_state.grid.at_node['river_topo_elevation'],0.00001,204 dqxy_1, env_state.SZgrid.core_nodes)*(1/A)*env_state.grid.at_node['SS_loss']205 206 FSy, FSs = smoth_func_L2(env_state.SZgrid.at_node['HEAD_2'][:],207 env_state.SZgrid.at_node['BOT'][:],208 self.thickness, REG_FACTOR, env_state.SZgrid.core_nodes)209 210 alpha = 1/(env_state.SZgrid.at_node['SZ_Sy'] + dtsp*CRIV)211 212 # Update water table upper layer213 env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes] = (dtsp*alpha[env_state.SZgrid.core_nodes]214 *((p*dqxy_1)[env_state.SZgrid.core_nodes] + CRIV[env_state.SZgrid.core_nodes]*env_state.grid.at_node['river_topo_elevation'][env_state.SZgrid.core_nodes])215 + env_state.SZgrid.at_node['SZ_Sy'][env_state.SZgrid.core_nodes]*alpha[env_state.SZgrid.core_nodes]216 *env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes])217 218 # Update water table lower layer219 env_state.SZgrid.at_node['HEAD_2'][env_state.SZgrid.core_nodes] += (dqxy_2[env_state.SZgrid.core_nodes]*dtsp*220 (FSy[env_state.SZgrid.core_nodes]/env_state.SZgrid.at_node['Sy_2'][env_state.SZgrid.core_nodes]+221 FSs[env_state.SZgrid.core_nodes]/env_state.SZgrid.at_node['Ss_2'][env_state.SZgrid.core_nodes])222 )223 224 env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes] = np.maximum(225 env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes],226 env_state.SZgrid.at_node['BOT_2'][env_state.SZgrid.core_nodes])227 228 # Update water storage anomalies229 env_state.grid.at_node['Base_flow'][env_state.SZgrid.core_nodes] += (230 CRIV[env_state.SZgrid.core_nodes]231 *np.abs(env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes]232 -env_state.grid.at_node['river_topo_elevation'][env_state.SZgrid.core_nodes])233 )234 235 dtsp = time_step(COURANT_2D, env_state.SZgrid.at_node['SZ_Sy'],236 env_state.SZgrid.at_node['Hydraulic_Conductivity'],237 env_state.SZgrid.at_node['water_table__elevation'],238 env_state.SZgrid.at_node['BOT'],239 env_state.SZgrid.dx,240 env_state.SZgrid.core_nodes241 )242 243 if dtsp <= 0:244 raise Exception("invalid time step", dtsp)245 if dtp == dt:246 dtp += dtsp247 elif (dtp + dtsp) > dt:248 dtsp = dt - dtp249 dtp += dtsp250 else:251 dtp += dtsp252 253 self.wte_dt = np.array(env_state.SZgrid.at_node['water_table__elevation'])254 255 env_state.SZgrid.at_node['discharge'][:] *= (1/dt)256 257 env_state.grid.at_node['riv_sat_deficit'][:] = np.power(env_state.grid.dx,2)\258 *np.array(env_state.grid.at_node['topographic__elevation']-\259 env_state.SZgrid.at_node['water_table__elevation'][:])260 261 pass262 263 def run_one_step_gw(self, env_state, dt, tht_dt, Droot):264 """Function to update water table depending on the unsaturated zone.265 Parameters:266 Droot: Rooting depth [mm]267 tht_dt: Water content at time t [-]268 Duz: Unsaturated zone depth269 env_state: grid: z: Topograhic elevation270 h: water table271 fs: Saturated water content272 fc: Field capacity273 Sy: Specific yield274 dq: water storage anomaly275 f: effective aquifer depth276 277 Groundwater storage variation278 """279 act_links = env_state.SZgrid.active_links280 281 T = transmissivity(env_state, self.Ksat, act_links, self.faq, self.zm)282 283 dts = time_step_confined(COURANT_2D, env_state.SZgrid.at_node['SZ_Sy'],284 map_max_of_node_links_to_node(env_state.SZgrid, T),285 env_state.SZgrid.dx, env_state.SZgrid.core_nodes286 )287 288 stage = env_state.grid.at_node['Q_ini'] * self.kriv289 290 aux_riv = np.ones(len(stage))291 292 aux_riv[stage > 0] = 0293 294 Tch = exponential_T(env_state.grid.at_node['SS_loss'], STR_RIVER,295 env_state.grid.at_node['river_topo_elevation'], self.hriv)296 297 dts_riv = time_step_confined(COURANT_1D, env_state.SZgrid.at_node['SZ_Sy'],298 Tch/self.W, 0.25*(env_state.SZgrid.dx - self.W),299 env_state.riv_nodes)300 301 #dtp = np.nanmin([dt, dts, dts_riv])302 dtp = np.nanmin([dt, dts])303 304 dtsp = dtp305 306 env_state.SZgrid.at_node['discharge'][:] = 0.0307 self.flux_out = 0308 self.dh[:] = 0309 310 while dtp <= dt:311 312 # Make water table always greater or equal to bottom elevation313 env_state.SZgrid.at_node['water_table__elevation'][:] = np.minimum(314 env_state.SZgrid.at_node['topographic__elevation'],315 env_state.SZgrid.at_node['water_table__elevation']316 )317 318 # Make water table always above the bottom elevation319 if env_state.func == 2:320 env_state.SZgrid.at_node['water_table__elevation'][:] = np.maximum(321 env_state.SZgrid.at_node['water_table__elevation'],322 env_state.SZgrid.at_node['BOT']323 )324 325 # Make river water table always below or equal surface elevation326 self.hriv = np.minimum(self.hriv,327 env_state.grid.at_node['river_topo_elevation']328 )329 330 # Calculate transmissivity331 T = transmissivity(env_state, self.Ksat, act_links, self.faq, self.zm)332 333 # Calculate the hydraulic gradients 334 dhdl = env_state.SZgrid.calc_grad_at_link(env_state.SZgrid.at_node['water_table__elevation'])335 336 # Calculate flux per unit length at each face337 qs = np.zeros(len(self.Ksat))338 qs[act_links] = -T[act_links]*dhdl[act_links]339 340 if self.act_fix_link == 1:341 self.flux_out += (np.sum(qs[self.fixed_links])/env_state.SZgrid.dx)*dtsp342 343 # Calculate flux head boundary conditions344 dfhbc = exponential_T(env_state.SZgrid.at_node['SZ_FHB'], 60,345 env_state.SZgrid.at_node['topographic__elevation'],346 env_state.SZgrid.at_node['water_table__elevation']347 )348 self.flux_out += np.sum(dfhbc)349 350 # Calculate flux gradient351 dqsdxy = (-env_state.SZgrid.calc_flux_div_at_node(qs)352 - dfhbc + env_state.SZgrid.at_node['recharge']/dt)353 354 # Calculate river cell flux355 Tch = exponential_T(env_state.grid.at_node['SS_loss'], STR_RIVER,356 env_state.grid.at_node['river_topo_elevation'], self.hriv)357 diff_stage = (env_state.SZgrid.at_node['water_table__elevation']358 - self.hriv)359 360 stage_aux = np.array(stage)361 stage_aux[diff_stage < 0] = 0362 363 qs_riv = -(Tch*(diff_stage-stage_aux)*50 / (env_state.SZgrid.dx - self.W))364 qs_riv[qs_riv < 0] = qs_riv[qs_riv < 0]*aux_riv[qs_riv < 0]365 366 dqsdxy += self.kaq*qs_riv367 368 # Regularization approach for aquifer cells369 if env_state.func == 1 or env_state.func == 2:370 dqs = regularization_T(371 env_state.SZgrid.at_node['topographic__elevation'],372 env_state.SZgrid.at_node['water_table__elevation'],373 self.faq_node, dqsdxy, REG_FACTOR374 )375 376 else:377 dqs = regularization(378 env_state.SZgrid.at_node['topographic__elevation'],379 env_state.SZgrid.at_node['water_table__elevation'],380 env_state.SZgrid.at_node['BOT'],381 dqsdxy, REG_FACTOR)382 383 # Regularization approach for river cells384 dqs_riv = regularization_T(385 env_state.grid.at_node['river_topo_elevation'],386 self.hriv, self.f, -self.kriv*qs_riv, REG_FACTOR387 )388 389 # Update the head elevations390 env_state.SZgrid.at_node['water_table__elevation'] += ((dqsdxy-dqs)391 * dtsp / env_state.SZgrid.at_node['SZ_Sy'])392 393 self.hriv += (-self.kriv*qs_riv - dqs_riv)*dtsp/env_state.SZgrid.at_node['SZ_Sy']394 395 # Update storage change for soil-gw interactions396 env_state.SZgrid.at_node['water_storage_anomaly'][:] = (dqsdxy-dqs) *dtsp 397 fun_update_UZ_SZ_depth(env_state, tht_dt, Droot)398 399 # Calculate total discharge400 env_state.SZgrid.at_node['discharge'][:] += (dqs + dqs_riv*self.kAriv)*dtsp401 402 # Calculate maximum time step403 dtsp = time_step_confined(COURANT_2D, env_state.SZgrid.at_node['SZ_Sy'],404 map_max_of_node_links_to_node(env_state.SZgrid, T),405 env_state.SZgrid.dx, env_state.SZgrid.core_nodes406 )407 408 dtsp_riv = time_step_confined(COURANT_1D, env_state.SZgrid.at_node['SZ_Sy'],409 Tch/self.W, 0.02*(env_state.SZgrid.dx - self.W), env_state.riv_nodes)410 411 #dtsp = np.min([dtsp, dtsp_riv]) 412 413 # Update time step414 if dtsp <= 0:415 raise Exception("invalid time step", dtsp) 416 if dtp == dt: 417 dtp += dtsp 418 elif (dtp + dtsp) > dt: 419 dtsp = dt - dtp 420 dtp += dtsp 421 else: 422 dtp += dtsp423 424 # Update state variables425 self.dh = np.array(env_state.SZgrid.at_node['water_table__elevation'])-self.wte_dt426 self.wte_dt = np.array(env_state.SZgrid.at_node['water_table__elevation'])427 428 env_state.SZgrid.at_node['discharge'][:] *= (1/dt)429 430 env_state.grid.at_node['riv_sat_deficit'][:] = (np.power(env_state.grid.dx,2)431 * np.array(env_state.grid.at_node['river_topo_elevation'][:]432 - env_state.SZgrid.at_node['water_table__elevation'][:])433 )434 435 env_state.grid.at_node['riv_sat_deficit'][env_state.grid.at_node['riv_sat_deficit'][:] < 0] = 0.0436 437 if self.act_fix_link == 1:438 self.flux_out *= 1/dt439 440 pass441 442 def SZ_potential_ET(self, env_state, pet_sz):443 SZ_aet = (env_state.SZgrid.at_node['water_table__elevation']444 - env_state.grid.at_node['topographic__elevation']445 + env_state.Droot*0.001)*1000 446 SZ_aet[SZ_aet < 0] = 0 447 f = SZ_aet/env_state.Droot448 return f*pet_sz449def transmissivity(env_state, Ksat, act_links, f, zm):450 T = np.zeros(len(Ksat))451 452 if env_state.func != 2:453 hm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'water_table__elevation')454 455 if env_state.func == 1: # Variable transmissivity 456 T = exponential_T(Ksat, f, zm, hm)457 elif env_state.func == 2: # Constant transmissivity 458 T[act_links] = Ksat[act_links]459 else: # Unconfined aquifer460 bm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'BOT')461 T[act_links] = Ksat[act_links]*(hm[act_links]-bm[act_links])462 463 return T464def fun_update_UZ_SZ_depth(env_state, tht_dt, Droot):465 """Function to update water table depending on both water content of466 the unsaturated zone.467 Parameters:468 Droot: Rooting depth [mm]469 tht_dt: Water content at time t [-]470 Duz: Unsaturated zone depth471 env_state: grid: z: Topograhic elevation472 h: water table473 fs: Saturated water content474 fc: Field capacity475 Sy: Specific yield476 dq: water storage anomaly477 Groundwater storage variation478 """ 479 h0 = env_state.SZgrid.at_node['water_table__elevation'] - \480 env_state.SZgrid.at_node['water_storage_anomaly']/ \481 env_state.SZgrid.at_node['SZ_Sy']482 483 tht_dt = np.where(env_state.SZgrid.at_node['water_storage_anomaly'][:] < 0.0,484 env_state.fc, tht_dt)485 486 dtht = env_state.grid.at_node['saturated_water_content'] - tht_dt487 488 ruz = (env_state.grid.at_node['topographic__elevation']-Droot) - h0489 490 dh = np.where((h0-(env_state.grid.at_node['topographic__elevation']-Droot)) > 0.0,491 env_state.SZgrid.at_node['water_storage_anomaly']/dtht,492 env_state.SZgrid.at_node['water_storage_anomaly']/493 env_state.SZgrid.at_node['SZ_Sy']) 494 495 h_aux = h0+dh-(env_state.grid.at_node['topographic__elevation']-Droot)496 497 dhr_aux = np.abs(ruz)-np.abs(dh)498 499 dh_aux = np.where(dh >= 0,1,-1)500 501 alpha = -1.*np.less_equal(dhr_aux, 0)*dh_aux*np.less_equal(ruz, 0)502 503 beta = np.less_equal(dhr_aux, 0)*dh_aux*np.less_equal(ruz, 0)504 505 gama = np.where(dhr_aux > 0, 1, 0)506 507 gama = np.where(dh > 0, gama, 1-gama)*np.less(h_aux, 0)508 509 gama[h_aux < 0] = 1510 511 dht = ((env_state.SZgrid.at_node['water_storage_anomaly']512 + ruz*(alpha*env_state.SZgrid.at_node['SZ_Sy'] + beta*dtht))513 / ((gama*env_state.SZgrid.at_node['SZ_Sy'] + (1-gama)*dtht)))514 515 env_state.SZgrid.at_node['water_table__elevation'] = h0 + dht516 517 env_state.SZgrid.at_node['discharge'][:] += np.where((518 env_state.SZgrid.at_node['topographic__elevation']519 - env_state.SZgrid.at_node['water_table__elevation']) <= 0.0,520 - (env_state.SZgrid.at_node['topographic__elevation']521 - env_state.SZgrid.at_node['water_table__elevation'])*dtht, 0.0)522 523 env_state.SZgrid.at_node['water_table__elevation'][:] = np.minimum(524 env_state.SZgrid.at_node['water_table__elevation'],525 env_state.SZgrid.at_node['topographic__elevation'])526 527 pass528def storage(env_state):529 storage = np.sum((env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes] -\530 env_state.SZgrid.at_node['BOT'][env_state.SZgrid.core_nodes])*\531 env_state.SZgrid.at_node['SZ_Sy'][env_state.SZgrid.core_nodes])532 return storage533 534def storage_2layer(env_state):535 head_Sy = (np.maximum(env_state.SZgrid.at_node['water_table__elevation'],536 env_state.SZgrid.at_node['BOT_2'])537 - env_state.SZgrid.at_node['BOT_2'])538 storage_1 = np.sum(head_Ss[env_state.SZgrid.core_nodes]539 *env_state.SZgrid.at_node['SZ_Sy'][env_state.SZgrid.core_nodes])540 541 head_Sy = (np.minimun(env_state.SZgrid.at_node['HEAD_2'],542 env_state.SZgrid.at_node['BOT_2'])543 - env_state.SZgrid.at_node['BOT'])544 545 head_Ss = (np.maximum(env_state.SZgrid.at_node['HEAD_2'],546 env_state.SZgrid.at_node['BOT_2'])547 - env_state.SZgrid.at_node['BOT_2'])548 549 storage_2 = np.sum((head_Sy*env_state.SZgrid.at_node['Sy_2']550 +head_Ss*env_state.SZgrid.at_node['Ss_2'])[env_state.SZgrid.core_nodes]551 )552 return storage_1+storage_2553def storage_uz_sz(env_state, tht, dh):554 """ Total storage in the saturated zone555 Parameters:556 env_state: state variables and model parameters 557 Output:558 total: Volume of water stored in the saturated zone [m]559 """560 str_uz1 = (env_state.SZgrid.at_node['water_table__elevation']561 - (env_state.grid.at_node['topographic__elevation']562 - env_state.Droot*0.001)563 )564 565 str_uz1[str_uz1 < 0] = 0.0566 567 str_uz0 = (env_state.SZgrid.at_node['water_table__elevation'] - dh568 - (env_state.grid.at_node['topographic__elevation']569 - env_state.Droot*0.001)570 )571 572 str_uz0[str_uz0 < 0] = 0.0573 574 tht[dh < 0] = env_state.fc[dh < 0]575 576 dtht = env_state.grid.at_node['saturated_water_content'] - tht577 578 str_uz = dtht*(str_uz1-str_uz0)579 580 str_sz = (dh-str_uz1+str_uz0)*env_state.SZgrid.at_node['SZ_Sy']581 582 total = np.sum(str_sz[env_state.SZgrid.core_nodes]583 + str_uz[env_state.SZgrid.core_nodes]584 )585 return total586def smoth_func_L1(h, hr, r, dq, *nodes):587 aux = np.power(h-hr, 3)/r 588 aux = np.where(aux > 0, aux, 0)589 aux = np.where(aux > 1, 1, aux)590 aux = np.where(dq > 0, 1, aux)591 592 if nodes:593 p = np.zeros(len(aux))594 p[nodes] = 1595 aux *= p596 597 return aux598 599def smoth_func_L2(h, hr, d, r,*nodes):600 u = (h-hr)/D601 aux = (1-u)/r602 FSy = 1 - np.exp(-aux)603 FSs = 1 - np.exp(aux)604 FSy = np.where(u >= 1, 0, fSy)605 FSs = np.where(u >= 1, fSs, 0)606 607 if nodes:608 aux = np.zeros(len(aux))609 aux[nodes] = 1610 FSs *= aux611 FSy *= aux612 613 return FSy, FSs614def river_flux(h, hriv, C, A, *nodes):615 q_riv = (h-hriv)*C/A616 if nodes:617 p = np.zeros(len(aux))618 p[nodes] = 1619 aux *= p620 return q_riv621def smoth_func_T(h, hriv, r, f, dq, *nodes):622 SF = (h-hriv+f)/f623 SF = np.where(SF > 1, SF, 0)624 SF = np.where(SF > 0, 1 - np.exp(SF/r), 0)625 if nodes:626 p = np.zeros(len(SF))627 p[nodes] = 1628 SF *= p629 return SF630def smoth_func(h, hriv, r, dq, *nodes):631 aux = np.power(h-hriv, 3)/r 632 aux = np.where(aux > 0, aux, 0)633 aux = np.where(aux > 1, 1, aux)634 if nodes:635 p = np.zeros(len(aux))636 p[nodes] = 1637 aux *= p638 return np.where(aux <= 0, 0, aux)639 640# regularization function for unconfined641def regularization(zm, hm, bm, dq, r):642 # zm: surface elevation643 # hm: hydraulic head644 # bm: bottom elevation aquifer645 # dq: flux per unit area646 # r: regularization factor647 aux = (hm-bm)/(zm-bm)648 aux = np.where((aux-1) > 0, 1, aux)649 return np.exp((aux-1)/r)*dq*np.where(dq > 0, 1, 0)650# regularization function for confined aquifers651def regularization_T(zm, hm, f, dq, r):652 # zm: surface elevation653 # hm: hydraulic head654 # f: e-folding depth655 # dq: flux per unit area656 # r: regularization factor657 aux = (hm-zm)/f+1 658 aux = np.where(aux > 0,aux,0)659 return np.exp((aux-1)/r)*dq*np.where(dq > 0,1,0)660 661def exponential_T(Ksat, f, z, h):662 return Ksat*f*np.exp(-np.maximum(z-h,0)/f)663# Maximum time step for unconfined aquifers664def time_step(D, Sy, Ksat, h, zb, dx, *nodes):665 # D: Courant number666 T = (h - zb)*Ksat667 dt = D*Sy*np.power(dx,2)/(4*T)668 if nodes: 669 dt = np.nanmin((dt[nodes])[dt[nodes] > 0])670 else:671 dt = np.nanmin(dt[dt > 0])672 return dt673# Maximum time step for confined aquifers674def time_step_confined(D, Sy, T, dx, *nodes):675 # D: Courant number676 dt = D*Sy*np.power(dx, 2)/(4*T) 677 if nodes: 678 dt = np.nanmin((dt[nodes])[dt[nodes] > 0])679 else:680 dt = np.nanmin(dt[dt > 0])...

Full Screen

Full Screen

test_lexnparse.py

Source:test_lexnparse.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3 jinja2.testsuite.lexnparse4 ~~~~~~~~~~~~~~~~~~~~~~~~~~5 All the unittests regarding lexing, parsing and syntax.6 :copyright: (c) 2017 by the Jinja Team.7 :license: BSD, see LICENSE for more details.8"""9import pytest10from jinja2 import Environment, Template, TemplateSyntaxError, \11 UndefinedError, nodes12from jinja2._compat import iteritems, text_type, PY213from jinja2.lexer import Token, TokenStream, TOKEN_EOF, \14 TOKEN_BLOCK_BEGIN, TOKEN_BLOCK_END15# how does a string look like in jinja syntax?16if PY2:17 def jinja_string_repr(string):18 return repr(string)[1:]19else:20 jinja_string_repr = repr21@pytest.mark.lexnparse22@pytest.mark.tokenstream23class TestTokenStream(object):24 test_tokens = [Token(1, TOKEN_BLOCK_BEGIN, ''),25 Token(2, TOKEN_BLOCK_END, ''),26 ]27 def test_simple(self, env):28 ts = TokenStream(self.test_tokens, "foo", "bar")29 assert ts.current.type is TOKEN_BLOCK_BEGIN30 assert bool(ts)31 assert not bool(ts.eos)32 next(ts)33 assert ts.current.type is TOKEN_BLOCK_END34 assert bool(ts)35 assert not bool(ts.eos)36 next(ts)37 assert ts.current.type is TOKEN_EOF38 assert not bool(ts)39 assert bool(ts.eos)40 def test_iter(self, env):41 token_types = [42 t.type for t in TokenStream(self.test_tokens, "foo", "bar")43 ]44 assert token_types == ['block_begin', 'block_end', ]45@pytest.mark.lexnparse46@pytest.mark.lexer47class TestLexer(object):48 def test_raw1(self, env):49 tmpl = env.from_string(50 '{% raw %}foo{% endraw %}|'51 '{%raw%}{{ bar }}|{% baz %}{% endraw %}')52 assert tmpl.render() == 'foo|{{ bar }}|{% baz %}'53 def test_raw2(self, env):54 tmpl = env.from_string('1 {%- raw -%} 2 {%- endraw -%} 3')55 assert tmpl.render() == '123'56 def test_balancing(self, env):57 env = Environment('{%', '%}', '${', '}')58 tmpl = env.from_string('''{% for item in seq59 %}${{'foo': item}|upper}{% endfor %}''')60 assert tmpl.render(seq=list(range(3))) \61 == "{'FOO': 0}{'FOO': 1}{'FOO': 2}"62 def test_comments(self, env):63 env = Environment('<!--', '-->', '{', '}')64 tmpl = env.from_string('''\65<ul>66<!--- for item in seq -->67 <li>{item}</li>68<!--- endfor -->69</ul>''')70 assert tmpl.render(seq=list(range(3))) \71 == ("<ul>\n <li>0</li>\n ""<li>1</li>\n <li>2</li>\n</ul>")72 def test_string_escapes(self, env):73 for char in u'\0', u'\u2668', u'\xe4', u'\t', u'\r', u'\n':74 tmpl = env.from_string('{{ %s }}' % jinja_string_repr(char))75 assert tmpl.render() == char76 assert env.from_string('{{ "\N{HOT SPRINGS}" }}').render() == u'\u2668'77 def test_bytefallback(self, env):78 from pprint import pformat79 tmpl = env.from_string(u'''{{ 'foo'|pprint }}|{{ 'bär'|pprint }}''')80 assert tmpl.render() == pformat('foo') + '|' + pformat(u'bär')81 def test_operators(self, env):82 from jinja2.lexer import operators83 for test, expect in iteritems(operators):84 if test in '([{}])':85 continue86 stream = env.lexer.tokenize('{{ %s }}' % test)87 next(stream)88 assert stream.current.type == expect89 def test_normalizing(self, env):90 for seq in '\r', '\r\n', '\n':91 env = Environment(newline_sequence=seq)92 tmpl = env.from_string('1\n2\r\n3\n4\n')93 result = tmpl.render()94 assert result.replace(seq, 'X') == '1X2X3X4'95 def test_trailing_newline(self, env):96 for keep in [True, False]:97 env = Environment(keep_trailing_newline=keep)98 for template, expected in [99 ('', {}),100 ('no\nnewline', {}),101 ('with\nnewline\n', {False: 'with\nnewline'}),102 ('with\nseveral\n\n\n', {False: 'with\nseveral\n\n'}),103 ]:104 tmpl = env.from_string(template)105 expect = expected.get(keep, template)106 result = tmpl.render()107 assert result == expect, (keep, template, result, expect)108@pytest.mark.lexnparse109@pytest.mark.parser110class TestParser(object):111 def test_php_syntax(self, env):112 env = Environment('<?', '?>', '<?=', '?>', '<!--', '-->')113 tmpl = env.from_string('''\114<!-- I'm a comment, I'm not interesting -->\115<? for item in seq -?>116 <?= item ?>117<?- endfor ?>''')118 assert tmpl.render(seq=list(range(5))) == '01234'119 def test_erb_syntax(self, env):120 env = Environment('<%', '%>', '<%=', '%>', '<%#', '%>')121 tmpl = env.from_string('''\122<%# I'm a comment, I'm not interesting %>\123<% for item in seq -%>124 <%= item %>125<%- endfor %>''')126 assert tmpl.render(seq=list(range(5))) == '01234'127 def test_comment_syntax(self, env):128 env = Environment('<!--', '-->', '${', '}', '<!--#', '-->')129 tmpl = env.from_string('''\130<!--# I'm a comment, I'm not interesting -->\131<!-- for item in seq --->132 ${item}133<!--- endfor -->''')134 assert tmpl.render(seq=list(range(5))) == '01234'135 def test_balancing(self, env):136 tmpl = env.from_string('''{{{'foo':'bar'}.foo}}''')137 assert tmpl.render() == 'bar'138 def test_start_comment(self, env):139 tmpl = env.from_string('''{# foo comment140and bar comment #}141{% macro blub() %}foo{% endmacro %}142{{ blub() }}''')143 assert tmpl.render().strip() == 'foo'144 def test_line_syntax(self, env):145 env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%')146 tmpl = env.from_string('''\147<%# regular comment %>148% for item in seq:149 ${item}150% endfor''')151 assert [152 int(x.strip()) for x in tmpl.render(seq=list(range(5))).split()153 ] == list(range(5))154 env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%', '##')155 tmpl = env.from_string('''\156<%# regular comment %>157% for item in seq:158 ${item} ## the rest of the stuff159% endfor''')160 assert [161 int(x.strip()) for x in tmpl.render(seq=list(range(5))).split()162 ] == list(range(5))163 def test_line_syntax_priority(self, env):164 # XXX: why is the whitespace there in front of the newline?165 env = Environment('{%', '%}', '${', '}', '/*', '*/', '##', '#')166 tmpl = env.from_string('''\167/* ignore me.168 I'm a multiline comment */169## for item in seq:170* ${item} # this is just extra stuff171## endfor''')172 assert tmpl.render(seq=[1, 2]).strip() == '* 1\n* 2'173 env = Environment('{%', '%}', '${', '}', '/*', '*/', '#', '##')174 tmpl = env.from_string('''\175/* ignore me.176 I'm a multiline comment */177# for item in seq:178* ${item} ## this is just extra stuff179 ## extra stuff i just want to ignore180# endfor''')181 assert tmpl.render(seq=[1, 2]).strip() == '* 1\n\n* 2'182 def test_error_messages(self, env):183 def assert_error(code, expected):184 try:185 Template(code)186 except TemplateSyntaxError as e:187 assert str(e) == expected, 'unexpected error message'188 else:189 assert False, 'that was supposed to be an error'190 assert_error('{% for item in seq %}...{% endif %}',191 "Encountered unknown tag 'endif'. Jinja was looking "192 "for the following tags: 'endfor' or 'else'. The "193 "innermost block that needs to be closed is 'for'.")194 assert_error(195 '{% if foo %}{% for item in seq %}...{% endfor %}{% endfor %}',196 "Encountered unknown tag 'endfor'. Jinja was looking for "197 "the following tags: 'elif' or 'else' or 'endif'. The "198 "innermost block that needs to be closed is 'if'.")199 assert_error('{% if foo %}',200 "Unexpected end of template. Jinja was looking for the "201 "following tags: 'elif' or 'else' or 'endif'. The "202 "innermost block that needs to be closed is 'if'.")203 assert_error('{% for item in seq %}',204 "Unexpected end of template. Jinja was looking for the "205 "following tags: 'endfor' or 'else'. The innermost block "206 "that needs to be closed is 'for'.")207 assert_error(208 '{% block foo-bar-baz %}',209 "Block names in Jinja have to be valid Python identifiers "210 "and may not contain hyphens, use an underscore instead.")211 assert_error('{% unknown_tag %}',212 "Encountered unknown tag 'unknown_tag'.")213@pytest.mark.lexnparse214@pytest.mark.syntax215class TestSyntax(object):216 def test_call(self, env):217 env = Environment()218 env.globals['foo'] = lambda a, b, c, e, g: a + b + c + e + g219 tmpl = env.from_string(220 "{{ foo('a', c='d', e='f', *['b'], **{'g': 'h'}) }}"221 )222 assert tmpl.render() == 'abdfh'223 def test_slicing(self, env):224 tmpl = env.from_string('{{ [1, 2, 3][:] }}|{{ [1, 2, 3][::-1] }}')225 assert tmpl.render() == '[1, 2, 3]|[3, 2, 1]'226 def test_attr(self, env):227 tmpl = env.from_string("{{ foo.bar }}|{{ foo['bar'] }}")228 assert tmpl.render(foo={'bar': 42}) == '42|42'229 def test_subscript(self, env):230 tmpl = env.from_string("{{ foo[0] }}|{{ foo[-1] }}")231 assert tmpl.render(foo=[0, 1, 2]) == '0|2'232 def test_tuple(self, env):233 tmpl = env.from_string('{{ () }}|{{ (1,) }}|{{ (1, 2) }}')234 assert tmpl.render() == '()|(1,)|(1, 2)'235 def test_math(self, env):236 tmpl = env.from_string('{{ (1 + 1 * 2) - 3 / 2 }}|{{ 2**3 }}')237 assert tmpl.render() == '1.5|8'238 def test_div(self, env):239 tmpl = env.from_string('{{ 3 // 2 }}|{{ 3 / 2 }}|{{ 3 % 2 }}')240 assert tmpl.render() == '1|1.5|1'241 def test_unary(self, env):242 tmpl = env.from_string('{{ +3 }}|{{ -3 }}')243 assert tmpl.render() == '3|-3'244 def test_concat(self, env):245 tmpl = env.from_string("{{ [1, 2] ~ 'foo' }}")246 assert tmpl.render() == '[1, 2]foo'247 def test_compare(self, env):248 tmpl = env.from_string('{{ 1 > 0 }}|{{ 1 >= 1 }}|{{ 2 < 3 }}|'249 '{{ 2 == 2 }}|{{ 1 <= 1 }}')250 assert tmpl.render() == 'True|True|True|True|True'251 def test_inop(self, env):252 tmpl = env.from_string('{{ 1 in [1, 2, 3] }}|{{ 1 not in [1, 2, 3] }}')253 assert tmpl.render() == 'True|False'254 def test_literals(self, env):255 tmpl = env.from_string('{{ [] }}|{{ {} }}|{{ () }}')256 assert tmpl.render().lower() == '[]|{}|()'257 def test_bool(self, env):258 tmpl = env.from_string('{{ true and false }}|{{ false '259 'or true }}|{{ not false }}')260 assert tmpl.render() == 'False|True|True'261 def test_grouping(self, env):262 tmpl = env.from_string(263 '{{ (true and false) or (false and true) and not false }}')264 assert tmpl.render() == 'False'265 def test_django_attr(self, env):266 tmpl = env.from_string('{{ [1, 2, 3].0 }}|{{ [[1]].0.0 }}')267 assert tmpl.render() == '1|1'268 def test_conditional_expression(self, env):269 tmpl = env.from_string('''{{ 0 if true else 1 }}''')270 assert tmpl.render() == '0'271 def test_short_conditional_expression(self, env):272 tmpl = env.from_string('<{{ 1 if false }}>')273 assert tmpl.render() == '<>'274 tmpl = env.from_string('<{{ (1 if false).bar }}>')275 pytest.raises(UndefinedError, tmpl.render)276 def test_filter_priority(self, env):277 tmpl = env.from_string('{{ "foo"|upper + "bar"|upper }}')278 assert tmpl.render() == 'FOOBAR'279 def test_function_calls(self, env):280 tests = [281 (True, '*foo, bar'),282 (True, '*foo, *bar'),283 (True, '*foo, bar=42'),284 (True, '**foo, *bar'),285 (True, '**foo, bar'),286 (False, 'foo, bar'),287 (False, 'foo, bar=42'),288 (False, 'foo, bar=23, *args'),289 (False, 'a, b=c, *d, **e'),290 (False, '*foo, **bar')291 ]292 for should_fail, sig in tests:293 if should_fail:294 pytest.raises(TemplateSyntaxError,295 env.from_string, '{{ foo(%s) }}' % sig)296 else:297 env.from_string('foo(%s)' % sig)298 def test_tuple_expr(self, env):299 for tmpl in [300 '{{ () }}',301 '{{ (1, 2) }}',302 '{{ (1, 2,) }}',303 '{{ 1, }}',304 '{{ 1, 2 }}',305 '{% for foo, bar in seq %}...{% endfor %}',306 '{% for x in foo, bar %}...{% endfor %}',307 '{% for x in foo, %}...{% endfor %}'308 ]:309 assert env.from_string(tmpl)310 def test_trailing_comma(self, env):311 tmpl = env.from_string('{{ (1, 2,) }}|{{ [1, 2,] }}|{{ {1: 2,} }}')312 assert tmpl.render().lower() == '(1, 2)|[1, 2]|{1: 2}'313 def test_block_end_name(self, env):314 env.from_string('{% block foo %}...{% endblock foo %}')315 pytest.raises(TemplateSyntaxError, env.from_string,316 '{% block x %}{% endblock y %}')317 def test_constant_casing(self, env):318 for const in True, False, None:319 tmpl = env.from_string('{{ %s }}|{{ %s }}|{{ %s }}' % (320 str(const), str(const).lower(), str(const).upper()321 ))322 assert tmpl.render() == '%s|%s|' % (const, const)323 def test_test_chaining(self, env):324 pytest.raises(TemplateSyntaxError, env.from_string,325 '{{ foo is string is sequence }}')326 assert env.from_string(327 '{{ 42 is string or 42 is number }}'328 ).render() == 'True'329 def test_string_concatenation(self, env):330 tmpl = env.from_string('{{ "foo" "bar" "baz" }}')331 assert tmpl.render() == 'foobarbaz'332 def test_notin(self, env):333 bar = range(100)334 tmpl = env.from_string('''{{ not 42 in bar }}''')335 assert tmpl.render(bar=bar) == text_type(not 42 in bar)336 def test_operator_precedence(self, env):337 tmpl = env.from_string('''{{ 2 * 3 + 4 % 2 + 1 - 2 }}''')338 assert tmpl.render() == text_type(2 * 3 + 4 % 2 + 1 - 2)339 def test_implicit_subscribed_tuple(self, env):340 class Foo(object):341 def __getitem__(self, x):342 return x343 t = env.from_string('{{ foo[1, 2] }}')344 assert t.render(foo=Foo()) == u'(1, 2)'345 def test_raw2(self, env):346 tmpl = env.from_string('{% raw %}{{ FOO }} and {% BAR %}{% endraw %}')347 assert tmpl.render() == '{{ FOO }} and {% BAR %}'348 def test_const(self, env):349 tmpl = env.from_string(350 '{{ true }}|{{ false }}|{{ none }}|'351 '{{ none is defined }}|{{ missing is defined }}')352 assert tmpl.render() == 'True|False|None|True|False'353 def test_neg_filter_priority(self, env):354 node = env.parse('{{ -1|foo }}')355 assert isinstance(node.body[0].nodes[0], nodes.Filter)356 assert isinstance(node.body[0].nodes[0].node, nodes.Neg)357 def test_const_assign(self, env):358 constass1 = '''{% set true = 42 %}'''359 constass2 = '''{% for none in seq %}{% endfor %}'''360 for tmpl in constass1, constass2:361 pytest.raises(TemplateSyntaxError, env.from_string, tmpl)362 def test_localset(self, env):363 tmpl = env.from_string('''{% set foo = 0 %}\364{% for item in [1, 2] %}{% set foo = 1 %}{% endfor %}\365{{ foo }}''')366 assert tmpl.render() == '0'367 def test_parse_unary(self, env):368 tmpl = env.from_string('{{ -foo["bar"] }}')369 assert tmpl.render(foo={'bar': 42}) == '-42'370 tmpl = env.from_string('{{ -foo["bar"]|abs }}')371 assert tmpl.render(foo={'bar': 42}) == '42'372@pytest.mark.lexnparse373@pytest.mark.lstripblocks374class TestLstripBlocks(object):375 def test_lstrip(self, env):376 env = Environment(lstrip_blocks=True, trim_blocks=False)377 tmpl = env.from_string(''' {% if True %}\n {% endif %}''')378 assert tmpl.render() == "\n"379 def test_lstrip_trim(self, env):380 env = Environment(lstrip_blocks=True, trim_blocks=True)381 tmpl = env.from_string(''' {% if True %}\n {% endif %}''')382 assert tmpl.render() == ""383 def test_no_lstrip(self, env):384 env = Environment(lstrip_blocks=True, trim_blocks=False)385 tmpl = env.from_string(''' {%+ if True %}\n {%+ endif %}''')386 assert tmpl.render() == " \n "387 def test_lstrip_endline(self, env):388 env = Environment(lstrip_blocks=True, trim_blocks=False)389 tmpl = env.from_string(390 ''' hello{% if True %}\n goodbye{% endif %}''')391 assert tmpl.render() == " hello\n goodbye"392 def test_lstrip_inline(self, env):393 env = Environment(lstrip_blocks=True, trim_blocks=False)394 tmpl = env.from_string(''' {% if True %}hello {% endif %}''')395 assert tmpl.render() == 'hello '396 def test_lstrip_nested(self, env):397 env = Environment(lstrip_blocks=True, trim_blocks=False)398 tmpl = env.from_string(399 ''' {% if True %}a {% if True %}b {% endif %}c {% endif %}''')400 assert tmpl.render() == 'a b c '401 def test_lstrip_left_chars(self, env):402 env = Environment(lstrip_blocks=True, trim_blocks=False)403 tmpl = env.from_string(''' abc {% if True %}404 hello{% endif %}''')405 assert tmpl.render() == ' abc \n hello'406 def test_lstrip_embeded_strings(self, env):407 env = Environment(lstrip_blocks=True, trim_blocks=False)408 tmpl = env.from_string(''' {% set x = " {% str %} " %}{{ x }}''')409 assert tmpl.render() == ' {% str %} '410 def test_lstrip_preserve_leading_newlines(self, env):411 env = Environment(lstrip_blocks=True, trim_blocks=False)412 tmpl = env.from_string('''\n\n\n{% set hello = 1 %}''')413 assert tmpl.render() == '\n\n\n'414 def test_lstrip_comment(self, env):415 env = Environment(lstrip_blocks=True, trim_blocks=False)416 tmpl = env.from_string(''' {# if True #}417hello418 {#endif#}''')419 assert tmpl.render() == '\nhello\n'420 def test_lstrip_angle_bracket_simple(self, env):421 env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%', '##',422 lstrip_blocks=True, trim_blocks=True)423 tmpl = env.from_string(''' <% if True %>hello <% endif %>''')424 assert tmpl.render() == 'hello '425 def test_lstrip_angle_bracket_comment(self, env):426 env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%', '##',427 lstrip_blocks=True, trim_blocks=True)428 tmpl = env.from_string(''' <%# if True %>hello <%# endif %>''')429 assert tmpl.render() == 'hello '430 def test_lstrip_angle_bracket(self, env):431 env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%', '##',432 lstrip_blocks=True, trim_blocks=True)433 tmpl = env.from_string('''\434 <%# regular comment %>435 <% for item in seq %>436${item} ## the rest of the stuff437 <% endfor %>''')438 assert tmpl.render(seq=range(5)) == \439 ''.join('%s\n' % x for x in range(5))440 def test_lstrip_angle_bracket_compact(self, env):441 env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%', '##',442 lstrip_blocks=True, trim_blocks=True)443 tmpl = env.from_string('''\444 <%#regular comment%>445 <%for item in seq%>446${item} ## the rest of the stuff447 <%endfor%>''')448 assert tmpl.render(seq=range(5)) == \449 ''.join('%s\n' % x for x in range(5))450 def test_php_syntax_with_manual(self, env):451 env = Environment('<?', '?>', '<?=', '?>', '<!--', '-->',452 lstrip_blocks=True, trim_blocks=True)453 tmpl = env.from_string('''\454 <!-- I'm a comment, I'm not interesting -->455 <? for item in seq -?>456 <?= item ?>457 <?- endfor ?>''')458 assert tmpl.render(seq=range(5)) == '01234'459 def test_php_syntax(self, env):460 env = Environment('<?', '?>', '<?=', '?>', '<!--', '-->',461 lstrip_blocks=True, trim_blocks=True)462 tmpl = env.from_string('''\463 <!-- I'm a comment, I'm not interesting -->464 <? for item in seq ?>465 <?= item ?>466 <? endfor ?>''')467 assert tmpl.render(seq=range(5)) \468 == ''.join(' %s\n' % x for x in range(5))469 def test_php_syntax_compact(self, env):470 env = Environment('<?', '?>', '<?=', '?>', '<!--', '-->',471 lstrip_blocks=True, trim_blocks=True)472 tmpl = env.from_string('''\473 <!-- I'm a comment, I'm not interesting -->474 <?for item in seq?>475 <?=item?>476 <?endfor?>''')477 assert tmpl.render(seq=range(5)) \478 == ''.join(' %s\n' % x for x in range(5))479 def test_erb_syntax(self, env):480 env = Environment('<%', '%>', '<%=', '%>', '<%#', '%>',481 lstrip_blocks=True, trim_blocks=True)482 # env.from_string('')483 # for n,r in env.lexer.rules.iteritems():484 # print n485 # print env.lexer.rules['root'][0][0].pattern486 # print "'%s'" % tmpl.render(seq=range(5))487 tmpl = env.from_string('''\488<%# I'm a comment, I'm not interesting %>489 <% for item in seq %>490 <%= item %>491 <% endfor %>492''')493 assert tmpl.render(seq=range(5)) \494 == ''.join(' %s\n' % x for x in range(5))495 def test_erb_syntax_with_manual(self, env):496 env = Environment('<%', '%>', '<%=', '%>', '<%#', '%>',497 lstrip_blocks=True, trim_blocks=True)498 tmpl = env.from_string('''\499<%# I'm a comment, I'm not interesting %>500 <% for item in seq -%>501 <%= item %>502 <%- endfor %>''')503 assert tmpl.render(seq=range(5)) == '01234'504 def test_erb_syntax_no_lstrip(self, env):505 env = Environment('<%', '%>', '<%=', '%>', '<%#', '%>',506 lstrip_blocks=True, trim_blocks=True)507 tmpl = env.from_string('''\508<%# I'm a comment, I'm not interesting %>509 <%+ for item in seq -%>510 <%= item %>511 <%- endfor %>''')512 assert tmpl.render(seq=range(5)) == ' 01234'513 def test_comment_syntax(self, env):514 env = Environment('<!--', '-->', '${', '}', '<!--#', '-->',515 lstrip_blocks=True, trim_blocks=True)516 tmpl = env.from_string('''\517<!--# I'm a comment, I'm not interesting -->\518<!-- for item in seq --->519 ${item}520<!--- endfor -->''')...

Full Screen

Full Screen

test_filters.py

Source:test_filters.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3 jinja2.testsuite.filters4 ~~~~~~~~~~~~~~~~~~~~~~~~5 Tests for the jinja filters.6 :copyright: (c) 2017 by the Jinja Team.7 :license: BSD, see LICENSE for more details.8"""9import pytest10from jinja2 import Markup, Environment11from jinja2._compat import text_type, implements_to_string12@pytest.mark.filter13class TestFilter(object):14 def test_filter_calling(self, env):15 rv = env.call_filter('sum', [1, 2, 3])16 assert rv == 617 def test_capitalize(self, env):18 tmpl = env.from_string('{{ "foo bar"|capitalize }}')19 assert tmpl.render() == 'Foo bar'20 def test_center(self, env):21 tmpl = env.from_string('{{ "foo"|center(9) }}')22 assert tmpl.render() == ' foo '23 def test_default(self, env):24 tmpl = env.from_string(25 "{{ missing|default('no') }}|{{ false|default('no') }}|"26 "{{ false|default('no', true) }}|{{ given|default('no') }}"27 )28 assert tmpl.render(given='yes') == 'no|False|no|yes'29 def test_dictsort(self, env):30 tmpl = env.from_string(31 '{{ foo|dictsort }}|'32 '{{ foo|dictsort(true) }}|'33 '{{ foo|dictsort(false, "value") }}'34 )35 out = tmpl.render(foo={"aa": 0, "b": 1, "c": 2, "AB": 3})36 assert out == ("[('aa', 0), ('AB', 3), ('b', 1), ('c', 2)]|"37 "[('AB', 3), ('aa', 0), ('b', 1), ('c', 2)]|"38 "[('aa', 0), ('b', 1), ('c', 2), ('AB', 3)]")39 def test_batch(self, env):40 tmpl = env.from_string("{{ foo|batch(3)|list }}|"41 "{{ foo|batch(3, 'X')|list }}")42 out = tmpl.render(foo=list(range(10)))43 assert out == ("[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]|"44 "[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 'X', 'X']]")45 def test_slice(self, env):46 tmpl = env.from_string('{{ foo|slice(3)|list }}|'47 '{{ foo|slice(3, "X")|list }}')48 out = tmpl.render(foo=list(range(10)))49 assert out == ("[[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]]|"50 "[[0, 1, 2, 3], [4, 5, 6, 'X'], [7, 8, 9, 'X']]")51 def test_escape(self, env):52 tmpl = env.from_string('''{{ '<">&'|escape }}''')53 out = tmpl.render()54 assert out == '&lt;&#34;&gt;&amp;'55 def test_striptags(self, env):56 tmpl = env.from_string('''{{ foo|striptags }}''')57 out = tmpl.render(foo=' <p>just a small \n <a href="#">'58 'example</a> link</p>\n<p>to a webpage</p> '59 '<!-- <p>and some commented stuff</p> -->')60 assert out == 'just a small example link to a webpage'61 def test_filesizeformat(self, env):62 tmpl = env.from_string(63 '{{ 100|filesizeformat }}|'64 '{{ 1000|filesizeformat }}|'65 '{{ 1000000|filesizeformat }}|'66 '{{ 1000000000|filesizeformat }}|'67 '{{ 1000000000000|filesizeformat }}|'68 '{{ 100|filesizeformat(true) }}|'69 '{{ 1000|filesizeformat(true) }}|'70 '{{ 1000000|filesizeformat(true) }}|'71 '{{ 1000000000|filesizeformat(true) }}|'72 '{{ 1000000000000|filesizeformat(true) }}'73 )74 out = tmpl.render()75 assert out == (76 '100 Bytes|1.0 kB|1.0 MB|1.0 GB|1.0 TB|100 Bytes|'77 '1000 Bytes|976.6 KiB|953.7 MiB|931.3 GiB'78 )79 def test_filesizeformat_issue59(self, env):80 tmpl = env.from_string(81 '{{ 300|filesizeformat }}|'82 '{{ 3000|filesizeformat }}|'83 '{{ 3000000|filesizeformat }}|'84 '{{ 3000000000|filesizeformat }}|'85 '{{ 3000000000000|filesizeformat }}|'86 '{{ 300|filesizeformat(true) }}|'87 '{{ 3000|filesizeformat(true) }}|'88 '{{ 3000000|filesizeformat(true) }}'89 )90 out = tmpl.render()91 assert out == (92 '300 Bytes|3.0 kB|3.0 MB|3.0 GB|3.0 TB|300 Bytes|'93 '2.9 KiB|2.9 MiB'94 )95 def test_first(self, env):96 tmpl = env.from_string('{{ foo|first }}')97 out = tmpl.render(foo=list(range(10)))98 assert out == '0'99 def test_float(self, env):100 tmpl = env.from_string('{{ "42"|float }}|'101 '{{ "ajsghasjgd"|float }}|'102 '{{ "32.32"|float }}')103 out = tmpl.render()104 assert out == '42.0|0.0|32.32'105 def test_format(self, env):106 tmpl = env.from_string('''{{ "%s|%s"|format("a", "b") }}''')107 out = tmpl.render()108 assert out == 'a|b'109 def test_indent(self, env):110 tmpl = env.from_string('{{ foo|indent(2) }}|{{ foo|indent(2, true) }}')111 text = '\n'.join([' '.join(['foo', 'bar'] * 2)] * 2)112 out = tmpl.render(foo=text)113 assert out == ('foo bar foo bar\n foo bar foo bar| '114 'foo bar foo bar\n foo bar foo bar')115 def test_int(self, env):116 class IntIsh(object):117 def __int__(self):118 return 42119 tmpl = env.from_string('{{ "42"|int }}|{{ "ajsghasjgd"|int }}|'120 '{{ "32.32"|int }}|{{ "0x4d32"|int(0, 16) }}|'121 '{{ "011"|int(0, 8)}}|{{ "0x33FU"|int(0, 16) }}|'122 '{{ obj|int }}')123 out = tmpl.render(obj=IntIsh())124 assert out == '42|0|32|19762|9|0|42'125 def test_join(self, env):126 tmpl = env.from_string('{{ [1, 2, 3]|join("|") }}')127 out = tmpl.render()128 assert out == '1|2|3'129 env2 = Environment(autoescape=True)130 tmpl = env2.from_string(131 '{{ ["<foo>", "<span>foo</span>"|safe]|join }}')132 assert tmpl.render() == '&lt;foo&gt;<span>foo</span>'133 def test_join_attribute(self, env):134 class User(object):135 def __init__(self, username):136 self.username = username137 tmpl = env.from_string('''{{ users|join(', ', 'username') }}''')138 assert tmpl.render(users=map(User, ['foo', 'bar'])) == 'foo, bar'139 def test_last(self, env):140 tmpl = env.from_string('''{{ foo|last }}''')141 out = tmpl.render(foo=list(range(10)))142 assert out == '9'143 def test_length(self, env):144 tmpl = env.from_string('''{{ "hello world"|length }}''')145 out = tmpl.render()146 assert out == '11'147 def test_lower(self, env):148 tmpl = env.from_string('''{{ "FOO"|lower }}''')149 out = tmpl.render()150 assert out == 'foo'151 def test_pprint(self, env):152 from pprint import pformat153 tmpl = env.from_string('''{{ data|pprint }}''')154 data = list(range(1000))155 assert tmpl.render(data=data) == pformat(data)156 def test_random(self, env):157 tmpl = env.from_string('''{{ seq|random }}''')158 seq = list(range(100))159 for _ in range(10):160 assert int(tmpl.render(seq=seq)) in seq161 def test_reverse(self, env):162 tmpl = env.from_string('{{ "foobar"|reverse|join }}|'163 '{{ [1, 2, 3]|reverse|list }}')164 assert tmpl.render() == 'raboof|[3, 2, 1]'165 def test_string(self, env):166 x = [1, 2, 3, 4, 5]167 tmpl = env.from_string('''{{ obj|string }}''')168 assert tmpl.render(obj=x) == text_type(x)169 def test_title(self, env):170 tmpl = env.from_string('''{{ "foo bar"|title }}''')171 assert tmpl.render() == "Foo Bar"172 tmpl = env.from_string('''{{ "foo's bar"|title }}''')173 assert tmpl.render() == "Foo's Bar"174 tmpl = env.from_string('''{{ "foo bar"|title }}''')175 assert tmpl.render() == "Foo Bar"176 tmpl = env.from_string('''{{ "f bar f"|title }}''')177 assert tmpl.render() == "F Bar F"178 tmpl = env.from_string('''{{ "foo-bar"|title }}''')179 assert tmpl.render() == "Foo-Bar"180 tmpl = env.from_string('''{{ "foo\tbar"|title }}''')181 assert tmpl.render() == "Foo\tBar"182 tmpl = env.from_string('''{{ "FOO\tBAR"|title }}''')183 assert tmpl.render() == "Foo\tBar"184 tmpl = env.from_string('''{{ "foo (bar)"|title }}''')185 assert tmpl.render() == "Foo (Bar)"186 tmpl = env.from_string('''{{ "foo {bar}"|title }}''')187 assert tmpl.render() == "Foo {Bar}"188 tmpl = env.from_string('''{{ "foo [bar]"|title }}''')189 assert tmpl.render() == "Foo [Bar]"190 tmpl = env.from_string('''{{ "foo <bar>"|title }}''')191 assert tmpl.render() == "Foo <Bar>"192 class Foo:193 def __str__(self):194 return 'foo-bar'195 tmpl = env.from_string('''{{ data|title }}''')196 out = tmpl.render(data=Foo())197 assert out == 'Foo-Bar'198 def test_truncate(self, env):199 tmpl = env.from_string(200 '{{ data|truncate(15, true, ">>>") }}|'201 '{{ data|truncate(15, false, ">>>") }}|'202 '{{ smalldata|truncate(15) }}'203 )204 out = tmpl.render(data='foobar baz bar' * 1000,205 smalldata='foobar baz bar')206 msg = 'Current output: %s' % out207 assert out == 'foobar baz b>>>|foobar baz>>>|foobar baz bar', msg208 def test_truncate_very_short(self, env):209 tmpl = env.from_string(210 '{{ "foo bar baz"|truncate(9) }}|'211 '{{ "foo bar baz"|truncate(9, true) }}'212 )213 out = tmpl.render()214 assert out == 'foo bar baz|foo bar baz', out215 def test_truncate_end_length(self, env):216 tmpl = env.from_string('{{ "Joel is a slug"|truncate(7, true) }}')217 out = tmpl.render()218 assert out == 'Joel...', 'Current output: %s' % out219 def test_upper(self, env):220 tmpl = env.from_string('{{ "foo"|upper }}')221 assert tmpl.render() == 'FOO'222 def test_urlize(self, env):223 tmpl = env.from_string(224 '{{ "foo http://www.example.com/ bar"|urlize }}')225 assert tmpl.render() == (226 'foo <a href="http://www.example.com/" rel="noopener">'227 'http://www.example.com/</a> bar'228 )229 def test_urlize_rel_policy(self):230 env = Environment()231 env.policies['urlize.rel'] = None232 tmpl = env.from_string(233 '{{ "foo http://www.example.com/ bar"|urlize }}')234 assert tmpl.render() == (235 'foo <a href="http://www.example.com/">'236 'http://www.example.com/</a> bar'237 )238 def test_urlize_target_parameter(self, env):239 tmpl = env.from_string(240 '{{ "foo http://www.example.com/ bar"|urlize(target="_blank") }}'241 )242 assert tmpl.render() \243 == 'foo <a href="http://www.example.com/" rel="noopener" target="_blank">'\244 'http://www.example.com/</a> bar'245 def test_wordcount(self, env):246 tmpl = env.from_string('{{ "foo bar baz"|wordcount }}')247 assert tmpl.render() == '3'248 def test_block(self, env):249 tmpl = env.from_string(250 '{% filter lower|escape %}<HEHE>{% endfilter %}'251 )252 assert tmpl.render() == '&lt;hehe&gt;'253 def test_chaining(self, env):254 tmpl = env.from_string(255 '''{{ ['<foo>', '<bar>']|first|upper|escape }}'''256 )257 assert tmpl.render() == '&lt;FOO&gt;'258 def test_sum(self, env):259 tmpl = env.from_string('''{{ [1, 2, 3, 4, 5, 6]|sum }}''')260 assert tmpl.render() == '21'261 def test_sum_attributes(self, env):262 tmpl = env.from_string('''{{ values|sum('value') }}''')263 assert tmpl.render(values=[264 {'value': 23},265 {'value': 1},266 {'value': 18},267 ]) == '42'268 def test_sum_attributes_nested(self, env):269 tmpl = env.from_string('''{{ values|sum('real.value') }}''')270 assert tmpl.render(values=[271 {'real': {'value': 23}},272 {'real': {'value': 1}},273 {'real': {'value': 18}},274 ]) == '42'275 def test_sum_attributes_tuple(self, env):276 tmpl = env.from_string('''{{ values.items()|sum('1') }}''')277 assert tmpl.render(values={278 'foo': 23,279 'bar': 1,280 'baz': 18,281 }) == '42'282 def test_abs(self, env):283 tmpl = env.from_string('''{{ -1|abs }}|{{ 1|abs }}''')284 assert tmpl.render() == '1|1', tmpl.render()285 def test_round_positive(self, env):286 tmpl = env.from_string('{{ 2.7|round }}|{{ 2.1|round }}|'287 "{{ 2.1234|round(3, 'floor') }}|"288 "{{ 2.1|round(0, 'ceil') }}")289 assert tmpl.render() == '3.0|2.0|2.123|3.0', tmpl.render()290 def test_round_negative(self, env):291 tmpl = env.from_string('{{ 21.3|round(-1)}}|'292 "{{ 21.3|round(-1, 'ceil')}}|"293 "{{ 21.3|round(-1, 'floor')}}")294 assert tmpl.render() == '20.0|30.0|20.0', tmpl.render()295 def test_xmlattr(self, env):296 tmpl = env.from_string(297 "{{ {'foo': 42, 'bar': 23, 'fish': none, "298 "'spam': missing, 'blub:blub': '<?>'}|xmlattr }}")299 out = tmpl.render().split()300 assert len(out) == 3301 assert 'foo="42"' in out302 assert 'bar="23"' in out303 assert 'blub:blub="&lt;?&gt;"' in out304 def test_sort1(self, env):305 tmpl = env.from_string(306 '{{ [2, 3, 1]|sort }}|{{ [2, 3, 1]|sort(true) }}')307 assert tmpl.render() == '[1, 2, 3]|[3, 2, 1]'308 def test_sort2(self, env):309 tmpl = env.from_string('{{ "".join(["c", "A", "b", "D"]|sort) }}')310 assert tmpl.render() == 'AbcD'311 def test_sort3(self, env):312 tmpl = env.from_string('''{{ ['foo', 'Bar', 'blah']|sort }}''')313 assert tmpl.render() == "['Bar', 'blah', 'foo']"314 def test_sort4(self, env):315 @implements_to_string316 class Magic(object):317 def __init__(self, value):318 self.value = value319 def __str__(self):320 return text_type(self.value)321 tmpl = env.from_string('''{{ items|sort(attribute='value')|join }}''')322 assert tmpl.render(items=map(Magic, [3, 2, 4, 1])) == '1234'323 def test_groupby(self, env):324 tmpl = env.from_string('''325 {%- for grouper, list in [{'foo': 1, 'bar': 2},326 {'foo': 2, 'bar': 3},327 {'foo': 1, 'bar': 1},328 {'foo': 3, 'bar': 4}]|groupby('foo') -%}329 {{ grouper }}{% for x in list %}: {{ x.foo }}, {{ x.bar }}{% endfor %}|330 {%- endfor %}''')331 assert tmpl.render().split('|') == [332 "1: 1, 2: 1, 1",333 "2: 2, 3",334 "3: 3, 4",335 ""336 ]337 def test_groupby_tuple_index(self, env):338 tmpl = env.from_string('''339 {%- for grouper, list in [('a', 1), ('a', 2), ('b', 1)]|groupby(0) -%}340 {{ grouper }}{% for x in list %}:{{ x.1 }}{% endfor %}|341 {%- endfor %}''')342 assert tmpl.render() == 'a:1:2|b:1|'343 def test_groupby_multidot(self, env):344 class Date(object):345 def __init__(self, day, month, year):346 self.day = day347 self.month = month348 self.year = year349 class Article(object):350 def __init__(self, title, *date):351 self.date = Date(*date)352 self.title = title353 articles = [354 Article('aha', 1, 1, 1970),355 Article('interesting', 2, 1, 1970),356 Article('really?', 3, 1, 1970),357 Article('totally not', 1, 1, 1971)358 ]359 tmpl = env.from_string('''360 {%- for year, list in articles|groupby('date.year') -%}361 {{ year }}{% for x in list %}[{{ x.title }}]{% endfor %}|362 {%- endfor %}''')363 assert tmpl.render(articles=articles).split('|') == [364 '1970[aha][interesting][really?]',365 '1971[totally not]',366 ''367 ]368 def test_filtertag(self, env):369 tmpl = env.from_string("{% filter upper|replace('FOO', 'foo') %}"370 "foobar{% endfilter %}")371 assert tmpl.render() == 'fooBAR'372 def test_replace(self, env):373 env = Environment()374 tmpl = env.from_string('{{ string|replace("o", 42) }}')375 assert tmpl.render(string='<foo>') == '<f4242>'376 env = Environment(autoescape=True)377 tmpl = env.from_string('{{ string|replace("o", 42) }}')378 assert tmpl.render(string='<foo>') == '&lt;f4242&gt;'379 tmpl = env.from_string('{{ string|replace("<", 42) }}')380 assert tmpl.render(string='<foo>') == '42foo&gt;'381 tmpl = env.from_string('{{ string|replace("o", ">x<") }}')382 assert tmpl.render(string=Markup('foo')) == 'f&gt;x&lt;&gt;x&lt;'383 def test_forceescape(self, env):384 tmpl = env.from_string('{{ x|forceescape }}')385 assert tmpl.render(x=Markup('<div />')) == u'&lt;div /&gt;'386 def test_safe(self, env):387 env = Environment(autoescape=True)388 tmpl = env.from_string('{{ "<div>foo</div>"|safe }}')389 assert tmpl.render() == '<div>foo</div>'390 tmpl = env.from_string('{{ "<div>foo</div>" }}')391 assert tmpl.render() == '&lt;div&gt;foo&lt;/div&gt;'392 def test_urlencode(self, env):393 env = Environment(autoescape=True)394 tmpl = env.from_string('{{ "Hello, world!"|urlencode }}')395 assert tmpl.render() == 'Hello%2C%20world%21'396 tmpl = env.from_string('{{ o|urlencode }}')397 assert tmpl.render(o=u"Hello, world\u203d") \398 == "Hello%2C%20world%E2%80%BD"399 assert tmpl.render(o=(("f", 1),)) == "f=1"400 assert tmpl.render(o=(('f', 1), ("z", 2))) == "f=1&amp;z=2"401 assert tmpl.render(o=((u"\u203d", 1),)) == "%E2%80%BD=1"402 assert tmpl.render(o={u"\u203d": 1}) == "%E2%80%BD=1"403 assert tmpl.render(o={0: 1}) == "0=1"404 def test_simple_map(self, env):405 env = Environment()406 tmpl = env.from_string('{{ ["1", "2", "3"]|map("int")|sum }}')407 assert tmpl.render() == '6'408 def test_attribute_map(self, env):409 class User(object):410 def __init__(self, name):411 self.name = name412 env = Environment()413 users = [414 User('john'),415 User('jane'),416 User('mike'),417 ]418 tmpl = env.from_string('{{ users|map(attribute="name")|join("|") }}')419 assert tmpl.render(users=users) == 'john|jane|mike'420 def test_empty_map(self, env):421 env = Environment()422 tmpl = env.from_string('{{ none|map("upper")|list }}')423 assert tmpl.render() == '[]'424 def test_simple_select(self, env):425 env = Environment()426 tmpl = env.from_string('{{ [1, 2, 3, 4, 5]|select("odd")|join("|") }}')427 assert tmpl.render() == '1|3|5'428 def test_bool_select(self, env):429 env = Environment()430 tmpl = env.from_string(431 '{{ [none, false, 0, 1, 2, 3, 4, 5]|select|join("|") }}'432 )433 assert tmpl.render() == '1|2|3|4|5'434 def test_simple_reject(self, env):435 env = Environment()436 tmpl = env.from_string('{{ [1, 2, 3, 4, 5]|reject("odd")|join("|") }}')437 assert tmpl.render() == '2|4'438 def test_bool_reject(self, env):439 env = Environment()440 tmpl = env.from_string(441 '{{ [none, false, 0, 1, 2, 3, 4, 5]|reject|join("|") }}'442 )443 assert tmpl.render() == 'None|False|0'444 def test_simple_select_attr(self, env):445 class User(object):446 def __init__(self, name, is_active):447 self.name = name448 self.is_active = is_active449 env = Environment()450 users = [451 User('john', True),452 User('jane', True),453 User('mike', False),454 ]455 tmpl = env.from_string(456 '{{ users|selectattr("is_active")|'457 'map(attribute="name")|join("|") }}'458 )459 assert tmpl.render(users=users) == 'john|jane'460 def test_simple_reject_attr(self, env):461 class User(object):462 def __init__(self, name, is_active):463 self.name = name464 self.is_active = is_active465 env = Environment()466 users = [467 User('john', True),468 User('jane', True),469 User('mike', False),470 ]471 tmpl = env.from_string('{{ users|rejectattr("is_active")|'472 'map(attribute="name")|join("|") }}')473 assert tmpl.render(users=users) == 'mike'474 def test_func_select_attr(self, env):475 class User(object):476 def __init__(self, id, name):477 self.id = id478 self.name = name479 env = Environment()480 users = [481 User(1, 'john'),482 User(2, 'jane'),483 User(3, 'mike'),484 ]485 tmpl = env.from_string('{{ users|selectattr("id", "odd")|'486 'map(attribute="name")|join("|") }}')487 assert tmpl.render(users=users) == 'john|mike'488 def test_func_reject_attr(self, env):489 class User(object):490 def __init__(self, id, name):491 self.id = id492 self.name = name493 env = Environment()494 users = [495 User(1, 'john'),496 User(2, 'jane'),497 User(3, 'mike'),498 ]499 tmpl = env.from_string('{{ users|rejectattr("id", "odd")|'500 'map(attribute="name")|join("|") }}')501 assert tmpl.render(users=users) == 'jane'502 def test_json_dump(self):503 env = Environment(autoescape=True)504 t = env.from_string('{{ x|tojson }}')505 assert t.render(x={'foo': 'bar'}) == '{&#34;foo&#34;: &#34;bar&#34;}'506 assert t.render(x='"bar\'') == r'&#34;\&#34;bar\u0027&#34;'507 def my_dumps(value, **options):508 assert options == {'foo': 'bar'}509 return '42'510 env.policies['json.dumps_function'] = my_dumps511 env.policies['json.dumps_kwargs'] = {'foo': 'bar'}...

Full Screen

Full Screen

base_env.py

Source:base_env.py Github

copy

Full Screen

...68 }, ...69 }70 """71 @staticmethod72 def to_base_env(73 env: EnvType,74 make_env: Callable[[int], EnvType] = None,75 num_envs: int = 1,76 remote_envs: bool = False,77 remote_env_batch_wait_ms: int = 0,78 policy_config: PartialTrainerConfigDict = None,79 ) -> "BaseEnv":80 """Wraps any env type as needed to expose the async interface."""81 from ray.rllib.env.remote_vector_env import RemoteVectorEnv82 if remote_envs and num_envs == 1:83 raise ValueError(84 "Remote envs only make sense to use if num_envs > 1 "85 "(i.e. vectorization is enabled).")86 if not isinstance(env, BaseEnv):87 if isinstance(env, MultiAgentEnv):88 if remote_envs:89 env = RemoteVectorEnv(90 make_env,91 num_envs,92 multiagent=True,93 remote_env_batch_wait_ms=remote_env_batch_wait_ms)94 else:95 env = _MultiAgentEnvToBaseEnv(96 make_env=make_env,97 existing_envs=[env],98 num_envs=num_envs)99 elif isinstance(env, ExternalEnv):100 if num_envs != 1:101 raise ValueError(102 "External(MultiAgent)Env does not currently support "103 "num_envs > 1. One way of solving this would be to "104 "treat your Env as a MultiAgentEnv hosting only one "105 "type of agent but with several copies.")106 env = _ExternalEnvToBaseEnv(env)107 elif isinstance(env, VectorEnv):108 env = _VectorEnvToBaseEnv(env)109 else:110 if remote_envs:111 env = RemoteVectorEnv(112 make_env,113 num_envs,114 multiagent=False,115 remote_env_batch_wait_ms=remote_env_batch_wait_ms,116 existing_envs=[env],117 )118 else:119 env = VectorEnv.wrap(120 make_env=make_env,121 existing_envs=[env],122 num_envs=num_envs,123 action_space=env.action_space,124 observation_space=env.observation_space,125 policy_config=policy_config,126 )127 env = _VectorEnvToBaseEnv(env)128 assert isinstance(env, BaseEnv), env129 return env130 @PublicAPI131 def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,132 MultiEnvDict, MultiEnvDict]:133 """Returns observations from ready agents.134 The returns are two-level dicts mapping from env_id to a dict of135 agent_id to values. The number of agents and envs can vary over time.136 Returns137 -------138 obs (dict): New observations for each ready agent.139 rewards (dict): Reward values for each ready agent. If the140 episode is just started, the value will be None.141 dones (dict): Done values for each ready agent. The special key142 "__all__" is used to indicate env termination.143 infos (dict): Info values for each ready agent.144 off_policy_actions (dict): Agents may take off-policy actions. When145 that happens, there will be an entry in this dict that contains146 the taken action. There is no need to send_actions() for agents147 that have already chosen off-policy actions.148 """149 raise NotImplementedError150 @PublicAPI151 def send_actions(self, action_dict: MultiEnvDict) -> None:152 """Called to send actions back to running agents in this env.153 Actions should be sent for each ready agent that returned observations154 in the previous poll() call.155 Args:156 action_dict (dict): Actions values keyed by env_id and agent_id.157 """158 raise NotImplementedError159 @PublicAPI160 def try_reset(self,161 env_id: Optional[EnvID] = None) -> Optional[MultiAgentDict]:162 """Attempt to reset the sub-env with the given id or all sub-envs.163 If the environment does not support synchronous reset, None can be164 returned here.165 Args:166 env_id (Optional[int]): The sub-env ID if applicable. If None,167 reset the entire Env (i.e. all sub-envs).168 Returns:169 Optional[MultiAgentDict]: Resetted (multi-agent) observation dict170 or None if reset is not supported.171 """172 return None173 @PublicAPI174 def get_unwrapped(self) -> List[EnvType]:175 """Return a reference to the underlying gym envs, if any.176 Returns:177 envs (list): Underlying gym envs or [].178 """179 return []180 @PublicAPI181 def try_render(self, env_id: Optional[EnvID] = None) -> None:182 """Tries to render the environment.183 Args:184 env_id (Optional[int]): The sub-env ID if applicable. If None,185 renders the entire Env (i.e. all sub-envs).186 """187 # By default, do nothing.188 pass189 @PublicAPI190 def stop(self) -> None:191 """Releases all resources used."""192 for env in self.get_unwrapped():193 if hasattr(env, "close"):194 env.close()195# Fixed agent identifier when there is only the single agent in the env196_DUMMY_AGENT_ID = "agent0"197def _with_dummy_agent_id(env_id_to_values: Dict[EnvID, Any],198 dummy_id: "AgentID" = _DUMMY_AGENT_ID199 ) -> MultiEnvDict:200 return {k: {dummy_id: v} for (k, v) in env_id_to_values.items()}201class _ExternalEnvToBaseEnv(BaseEnv):202 """Internal adapter of ExternalEnv to BaseEnv."""203 def __init__(self,204 external_env: ExternalEnv,205 preprocessor: "Preprocessor" = None):206 self.external_env = external_env207 self.prep = preprocessor208 self.multiagent = issubclass(type(external_env), ExternalMultiAgentEnv)209 self.action_space = external_env.action_space210 if preprocessor:211 self.observation_space = preprocessor.observation_space212 else:213 self.observation_space = external_env.observation_space214 external_env.start()215 @override(BaseEnv)216 def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,217 MultiEnvDict, MultiEnvDict]:218 with self.external_env._results_avail_condition:219 results = self._poll()220 while len(results[0]) == 0:221 self.external_env._results_avail_condition.wait()222 results = self._poll()223 if not self.external_env.isAlive():224 raise Exception("Serving thread has stopped.")225 limit = self.external_env._max_concurrent_episodes226 assert len(results[0]) < limit, \227 ("Too many concurrent episodes, were some leaked? This "228 "ExternalEnv was created with max_concurrent={}".format(limit))229 return results230 @override(BaseEnv)231 def send_actions(self, action_dict: MultiEnvDict) -> None:232 if self.multiagent:233 for env_id, actions in action_dict.items():234 self.external_env._episodes[env_id].action_queue.put(actions)235 else:236 for env_id, action in action_dict.items():237 self.external_env._episodes[env_id].action_queue.put(238 action[_DUMMY_AGENT_ID])239 def _poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,240 MultiEnvDict, MultiEnvDict]:241 all_obs, all_rewards, all_dones, all_infos = {}, {}, {}, {}242 off_policy_actions = {}243 for eid, episode in self.external_env._episodes.copy().items():244 data = episode.get_data()245 cur_done = episode.cur_done_dict[246 "__all__"] if self.multiagent else episode.cur_done247 if cur_done:248 del self.external_env._episodes[eid]249 if data:250 if self.prep:251 all_obs[eid] = self.prep.transform(data["obs"])252 else:253 all_obs[eid] = data["obs"]254 all_rewards[eid] = data["reward"]255 all_dones[eid] = data["done"]256 all_infos[eid] = data["info"]257 if "off_policy_action" in data:258 off_policy_actions[eid] = data["off_policy_action"]259 if self.multiagent:260 # Ensure a consistent set of keys261 # rely on all_obs having all possible keys for now.262 for eid, eid_dict in all_obs.items():263 for agent_id in eid_dict.keys():264 def fix(d, zero_val):265 if agent_id not in d[eid]:266 d[eid][agent_id] = zero_val267 fix(all_rewards, 0.0)268 fix(all_dones, False)269 fix(all_infos, {})270 return (all_obs, all_rewards, all_dones, all_infos,271 off_policy_actions)272 else:273 return _with_dummy_agent_id(all_obs), \274 _with_dummy_agent_id(all_rewards), \275 _with_dummy_agent_id(all_dones, "__all__"), \276 _with_dummy_agent_id(all_infos), \277 _with_dummy_agent_id(off_policy_actions)278class _VectorEnvToBaseEnv(BaseEnv):279 """Internal adapter of VectorEnv to BaseEnv.280 We assume the caller will always send the full vector of actions in each281 call to send_actions(), and that they call reset_at() on all completed282 environments before calling send_actions().283 """284 def __init__(self, vector_env: VectorEnv):285 self.vector_env = vector_env286 self.action_space = vector_env.action_space287 self.observation_space = vector_env.observation_space288 self.num_envs = vector_env.num_envs289 self.new_obs = None # lazily initialized290 self.cur_rewards = [None for _ in range(self.num_envs)]291 self.cur_dones = [False for _ in range(self.num_envs)]292 self.cur_infos = [None for _ in range(self.num_envs)]293 @override(BaseEnv)294 def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,295 MultiEnvDict, MultiEnvDict]:296 if self.new_obs is None:297 self.new_obs = self.vector_env.vector_reset()298 new_obs = dict(enumerate(self.new_obs))299 rewards = dict(enumerate(self.cur_rewards))300 dones = dict(enumerate(self.cur_dones))301 infos = dict(enumerate(self.cur_infos))302 self.new_obs = []303 self.cur_rewards = []304 self.cur_dones = []305 self.cur_infos = []306 return _with_dummy_agent_id(new_obs), \307 _with_dummy_agent_id(rewards), \308 _with_dummy_agent_id(dones, "__all__"), \309 _with_dummy_agent_id(infos), {}310 @override(BaseEnv)311 def send_actions(self, action_dict: MultiEnvDict) -> None:312 action_vector = [None] * self.num_envs313 for i in range(self.num_envs):314 action_vector[i] = action_dict[i][_DUMMY_AGENT_ID]315 self.new_obs, self.cur_rewards, self.cur_dones, self.cur_infos = \316 self.vector_env.vector_step(action_vector)317 @override(BaseEnv)318 def try_reset(self, env_id: Optional[EnvID] = None) -> MultiAgentDict:319 assert env_id is None or isinstance(env_id, int)320 return {_DUMMY_AGENT_ID: self.vector_env.reset_at(env_id)}321 @override(BaseEnv)322 def get_unwrapped(self) -> List[EnvType]:323 return self.vector_env.get_unwrapped()324 @override(BaseEnv)325 def try_render(self, env_id: Optional[EnvID] = None) -> None:326 assert env_id is None or isinstance(env_id, int)327 return self.vector_env.try_render_at(env_id)328class _MultiAgentEnvToBaseEnv(BaseEnv):329 """Internal adapter of MultiAgentEnv to BaseEnv.330 This also supports vectorization if num_envs > 1.331 """332 def __init__(self, make_env: Callable[[int], EnvType],333 existing_envs: List[MultiAgentEnv], num_envs: int):334 """Wrap existing multi-agent envs.335 Args:336 make_env (func|None): Factory that produces a new multiagent env.337 Must be defined if the number of existing envs is less than338 num_envs.339 existing_envs (list): List of existing multiagent envs.340 num_envs (int): Desired num multiagent envs to keep total.341 """342 self.make_env = make_env343 self.envs = existing_envs344 self.num_envs = num_envs345 self.dones = set()346 while len(self.envs) < self.num_envs:347 self.envs.append(self.make_env(len(self.envs)))348 for env in self.envs:349 assert isinstance(env, MultiAgentEnv)350 self.env_states = [_MultiAgentEnvState(env) for env in self.envs]351 @override(BaseEnv)352 def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,353 MultiEnvDict, MultiEnvDict]:354 obs, rewards, dones, infos = {}, {}, {}, {}355 for i, env_state in enumerate(self.env_states):356 obs[i], rewards[i], dones[i], infos[i] = env_state.poll()357 return obs, rewards, dones, infos, {}358 @override(BaseEnv)359 def send_actions(self, action_dict: MultiEnvDict) -> None:360 for env_id, agent_dict in action_dict.items():361 if env_id in self.dones:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful