Best Python code snippet using playwright-python
all_reduce.py
Source:all_reduce.py
1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# ==============================================================================15"""Utilities to construct a TF subgraph implementing distributed All-Reduce."""16from __future__ import absolute_import17from __future__ import division18from __future__ import print_function19import collections20import math21from tensorflow.contrib import nccl22from tensorflow.python.framework import device as device_lib23from tensorflow.python.framework import ops24from tensorflow.python.ops import array_ops25from tensorflow.python.ops import math_ops26def _flatten_tensors(tensors):27 """Check tensors for isomorphism and flatten.28 Args:29 tensors: list of T @{tf.Tensor} which must all have the same shape.30 Returns:31 tensors: a list of T @{tf.Tensor} which are flattened (1D) views of tensors32 shape: the original shape of each element of input tensors33 Raises:34 ValueError: tensors are empty or non-isomorphic or have unknown shape.35 """36 if not tensors:37 raise ValueError("tensors cannot be empty")38 shape = tensors[0].shape39 for tensor in tensors:40 shape = shape.merge_with(tensor.shape)41 if not shape.is_fully_defined():42 raise ValueError("Tensors must have statically known shape.")43 if len(shape) != 1:44 reshaped = []45 for t in tensors:46 with ops.colocate_with(t):47 reshaped.append(array_ops.reshape(t, [-1]))48 tensors = reshaped49 return tensors, shape50def _reshape_tensors(tensors, shape):51 """Reshape tensors flattened by _flatten_tensors.52 Args:53 tensors: list of T @{tf.Tensor} of identical length 1D tensors.54 shape: list of integers describing the desired shape. Product of55 the elements must equal the length of each tensor.56 Returns:57 list of T @{tf.Tensor} which are the reshaped inputs.58 """59 reshaped = []60 for t in tensors:61 with ops.colocate_with(t):62 reshaped.append(array_ops.reshape(t, shape))63 return reshaped64def _padded_split(tensor, pieces):65 """Like split for 1D tensors but pads-out case where len % pieces != 0.66 Args:67 tensor: T @{tf.Tensor} that must be 1D.68 pieces: a positive integer specifying the number of pieces into which69 tensor should be split.70 Returns:71 list of T @{tf.Tensor} of length pieces, which hold the values of72 thin input tensor, in order. The final tensor may73 be zero-padded on the end to make its size equal to those of all74 of the other tensors.75 Raises:76 ValueError: The input tensor is not 1D.77 """78 shape = tensor.shape79 if 1 != len(shape):80 raise ValueError("input tensor must be 1D")81 tensor_len = shape[0].value82 with ops.colocate_with(tensor):83 if tensor_len % pieces != 0:84 # pad to an even length85 chunk_size = 1 + tensor_len // pieces86 if pieces > tensor_len:87 # This is an edge case that should not come up in practice,88 # i.e. a different reduction algorithm would be better,89 # but we'll make it work just for completeness.90 pad_len = pieces - tensor_len91 extended_whole = array_ops.concat(92 [tensor, array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)93 parts = array_ops.split(extended_whole, pieces)94 return parts, pad_len95 elif (pieces - 1) * chunk_size >= tensor_len:96 # Another edge case of limited real interest.97 pad_len = (pieces * chunk_size) % tensor_len98 extended_whole = array_ops.concat(99 [tensor, array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)100 parts = array_ops.split(extended_whole, pieces)101 return parts, pad_len102 else:103 last_chunk_size = tensor_len - (pieces - 1) * chunk_size104 pad_len = chunk_size - last_chunk_size105 piece_lens = [chunk_size for _ in range(pieces - 1)] + [last_chunk_size]106 parts = array_ops.split(tensor, piece_lens)107 parts[-1] = array_ops.concat(108 [parts[-1], array_ops.zeros([pad_len], dtype=tensor.dtype)], 0)109 return parts, pad_len110 else:111 return array_ops.split(tensor, pieces), 0112def _strip_padding(tensors, pad_len):113 """Strip the suffix padding added by _padded_split.114 Args:115 tensors: list of T @{tf.Tensor} of identical length 1D tensors.116 pad_len: number of elements to be stripped from the end of each tensor.117 Returns:118 list of T @{tf.Tensor} which are the stripped inputs.119 Raises:120 ValueError: tensors must be a non-empty list of 1D tensors, and121 each must be longer than pad_len.122 """123 if not tensors:124 raise ValueError("tensors cannot be empty")125 shape = tensors[0].shape126 if len(shape) > 1:127 raise ValueError("tensors must be 1D")128 prefix_len = int(shape[0] - pad_len)129 if prefix_len < 0:130 raise ValueError("pad_len longer than tensor")131 stripped = []132 for t in tensors:133 with ops.colocate_with(t):134 stripped.append(array_ops.slice(t, [0], [prefix_len]))135 return stripped136def _ragged_split(tensor, pieces):137 """Like split for 1D tensors but allows case where len % pieces != 0.138 Args:139 tensor: T @{tf.Tensor} that must be 1D.140 pieces: a positive integer specifying the number of pieces into which141 tensor should be split.142 Returns:143 list of T @{tf.Tensor} of length pieces, which hold the values of144 the input tensor, in order. The final tensor may be shorter145 than the others, which will all be of equal length.146 Raises:147 ValueError: input tensor must be 1D.148 """149 shape = tensor.shape150 if 1 != len(shape):151 raise ValueError("input tensor must be 1D")152 tensor_len = shape[0].value153 chunk_size = tensor_len // pieces154 with ops.colocate_with(tensor):155 if tensor_len != (pieces * chunk_size):156 # last piece will be short157 assert pieces > 1158 last_chunk_size = tensor_len - ((pieces - 1) * chunk_size)159 assert last_chunk_size > 0160 piece_lens = [chunk_size for _ in range(pieces - 1)] + [last_chunk_size]161 return array_ops.split(tensor, piece_lens)162 else:163 return array_ops.split(tensor, pieces)164def _ring_permutations(num_workers, num_subchunks, gpu_perm):165 """"Generate an array of device index arrays, one for each subchunk.166 In the basic ring reduction algorithm there are size(T)/num_devices167 data chunks and each device process one chunk per tick, i.e. sending168 one chunk and receiving one chunk. The idea of subchunking is that169 each device processes num_subchunks smaller data regions per tick,170 and the ring rank permutation is different for each subchunk index171 so that a device is potentially sending to and receiving from172 num_subchunks different other devices at each tick. Where multiple173 independent data channels exist between devices, this strategy174 supplies a method of using them in parallel.175 Args:176 num_workers: number of worker tasks177 num_subchunks: number of subchunks into which to divide each per-GPU chunk.178 gpu_perm: an array of integers in [0, num_gpus-1] giving the default179 ring order of GPUs at each worker. Other permutations will be generated180 by rotating this array and splicing together per-worker instances.181 Raises:182 ValueError: the number of subchunks may not exceed the number of GPUs.183 Returns:184 pred_by_s_d: list of lists that maps (by index) from (subchunk, dev) to185 preceding device in the permutation for that subchunk. The186 device index of GPU i at worker j is i + (j * num_gpus).187 rank_by_s_d: list of lists that maps (by index) from (subchunk, dev) to188 local rank of device d in the permutation for that subchunk.189 """190 num_gpus = len(gpu_perm)191 devices = num_workers * num_gpus192 if devices == 0:193 return [], []194 if num_subchunks > num_gpus:195 raise ValueError(196 "num_subchunks %d must be <= num_gpus %d" % (num_subchunks, num_gpus))197 rotation_interval = max(1, int(num_gpus / num_subchunks))198 perms_by_s = []199 for s in range(0, num_subchunks):200 full_order = []201 offset = s * rotation_interval202 for w in range(0, num_workers):203 default_order = [(w * num_gpus) + i for i in gpu_perm]204 dev_order = default_order[offset:] + default_order[:offset]205 full_order += dev_order206 perms_by_s.append(full_order)207 pred_by_s_d = [[-1 for d in range(0, devices)]208 for s in range(0, num_subchunks)]209 rank_by_s_d = [[-1 for d in range(0, devices)]210 for s in range(0, num_subchunks)]211 for s in range(0, num_subchunks):212 for d in range(0, devices):213 for t in range(0, devices):214 if d == perms_by_s[s][t]:215 rank_by_s_d[s][d] = t216 pred_by_s_d[s][d] = perms_by_s[s][(t + devices - 1) % devices]217 break218 return (pred_by_s_d, rank_by_s_d)219def build_ring_all_reduce(input_tensors, num_workers, num_subchunks,220 gpu_perm, red_op, un_op=None):221 """Construct a subgraph performing a ring-style all-reduce of input_tensors.222 Args:223 input_tensors: a list of T @{tf.Tensor} objects, which must all224 have the same shape and type.225 num_workers: number of worker tasks spanned by input_tensors.226 num_subchunks: number of subchunks each device should process in one tick.227 gpu_perm: a list of ints giving a ring-wise rank ordering of GPUs at228 each worker. All workers must have the same number of229 GPUs with the same rank ordering. If NVLINK is available, this should230 be a ring order supported by NVLINK edges.231 red_op: a binary operator for elementwise reduction.232 un_op: an optional unary operator to apply to fully reduced values.233 Raises:234 ValueError: empty input_tensors or they don't all have same235 size.236 Returns:237 a list of T @{tf.Tensor} identical sum-reductions of input_tensors.238 """239 if len(input_tensors) < 2:240 raise ValueError("input_tensors must be length 2 or longer")241 input_tensors, shape = _flatten_tensors(input_tensors)242 devices = [t.device for t in input_tensors]243 (pred_by_s_d, rank_by_s_d) = _ring_permutations(244 num_workers, num_subchunks, gpu_perm)245 chunks_by_dev, pad_len = _build_ring_gather(246 input_tensors, devices,247 num_subchunks, pred_by_s_d, rank_by_s_d, red_op)248 if un_op:249 chunks_by_dev = _apply_unary_to_chunks(un_op, chunks_by_dev)250 output_tensors = _build_ring_scatter(pred_by_s_d, rank_by_s_d,251 chunks_by_dev)252 if pad_len > 0:253 output_tensors = _strip_padding(output_tensors, pad_len)254 if len(shape) != 1:255 output_tensors = _reshape_tensors(output_tensors, shape)256 return output_tensors257def _build_ring_gather(input_tensors, devices, num_subchunks,258 pred_by_s_d, rank_by_s_d, red_op):259 """Construct a subgraph for the first (reduction) pass of ring all-reduce.260 Args:261 input_tensors: a list of T @{tf.Tensor} 1D input tensors of same262 shape and type.263 devices: array of device name strings264 num_subchunks: number of subchunks each device should process in one tick.265 pred_by_s_d: as produced by _ring_permutations266 rank_by_s_d: as produced by _ring_permutations267 red_op: a binary operator for elementwise reduction268 Raises:269 ValueError: tensors must all be one dimensional.270 Returns:271 list of list of T @{tf.Tensor} of (partially) reduced values where272 exactly num_subchunks chunks at each device are fully reduced.273 """274 num_devices = len(input_tensors)275 if num_devices == 0:276 return []277 if num_devices == 1:278 return input_tensors279 shape = input_tensors[0].shape280 if 1 != len(shape):281 raise ValueError("input tensors must be 1D")282 num_chunks = num_devices * num_subchunks283 num_ticks = num_devices - 1284 # Initialize chunks_by_dev with splits of the input tensors.285 chunks_by_dev = []286 split_pad_len = 0287 for d in range(0, num_devices):288 with ops.device(devices[d]):289 splits, split_pad_len = _padded_split(input_tensors[d], num_chunks)290 chunks_by_dev.append(splits)291 # Reduction phase292 for tick in range(0, num_ticks):293 # One new partial reduction for every chunk294 new_partial_reductions = [None for _ in range(0, num_chunks)]295 # Compute reductions with respect to last tick's values296 for d in range(0, num_devices):297 with ops.device(devices[d]):298 for s in range(0, num_subchunks):299 rank = rank_by_s_d[s][d]300 seg_index = (rank + num_devices - (2 + tick)) % num_devices301 pred_dev = pred_by_s_d[s][d]302 chunk_index = (seg_index * num_subchunks) + s303 new_partial_reductions[chunk_index] = red_op(304 chunks_by_dev[pred_dev][chunk_index],305 chunks_by_dev[d][chunk_index])306 # Update chunks_by_dev with the new values at the end of the tick.307 for d in range(0, num_devices):308 for s in range(0, num_subchunks):309 rank = rank_by_s_d[s][d]310 seg_index = (rank + num_devices - (2 + tick)) % num_devices311 chunk_index = (seg_index * num_subchunks) + s312 chunks_by_dev[d][chunk_index] = new_partial_reductions[chunk_index]313 return chunks_by_dev, split_pad_len314def _apply_unary_to_chunks(f, chunks_by_dev):315 """Apply a unary op to each tensor in chunks_by_dev, on same device.316 Args:317 f: a unary function over T @{tf.Tensor}.318 chunks_by_dev: list of lists of T @{tf.Tensor}.319 Returns:320 new list of lists of T @{tf.Tensor} with the same structure as321 chunks_by_dev containing the derived tensors.322 """323 output = []324 for x in chunks_by_dev:325 with ops.colocate_with(x[0]):326 output.append([f(t) for t in x])327 return output328def _build_ring_scatter(pred_by_s_d, rank_by_s_d,329 chunks_by_dev):330 """Construct subgraph for second (scatter) pass of ring all-reduce.331 Args:332 pred_by_s_d: as produced by _ring_permutations333 rank_by_s_d: as produced by _ring_permutations334 chunks_by_dev: list of list of T @{tf.Tensor} indexed by ints335 (device, chunk)336 Raises:337 ValueError: chunks_by_dev is not well-formed338 Returns:339 list of T @{tf.Tensor} which are the fully reduced tensors, one340 at each device corresponding to the outer dimension of chunks_by_dev.341 """342 num_devices = len(chunks_by_dev)343 num_chunks = len(chunks_by_dev[0])344 if 0 != num_chunks % num_devices:345 raise ValueError(346 "Expect number of chunks per device to be divisible by num_devices")347 num_subchunks = int(num_chunks / num_devices)348 num_ticks = num_devices - 1349 for tick in range(0, num_ticks):350 passed_values = [None for _ in range(0, num_chunks)]351 for d in range(0, num_devices):352 with ops.colocate_with(chunks_by_dev[d][0]):353 for s in range(0, num_subchunks):354 rank = rank_by_s_d[s][d]355 seg_index = (rank + num_devices - (1 + tick)) % num_devices356 pred_dev = pred_by_s_d[s][d]357 chunk_index = (seg_index * num_subchunks) + s358 passed_values[chunk_index] = array_ops.identity(359 chunks_by_dev[pred_dev][chunk_index])360 for d in range(0, num_devices):361 for s in range(0, num_subchunks):362 rank = rank_by_s_d[s][d]363 seg_index = (rank + num_devices - (1 + tick)) % num_devices364 chunk_index = (seg_index * num_subchunks) + s365 chunks_by_dev[d][chunk_index] = passed_values[chunk_index]366 # Join chunks at each device.367 output = []368 for x in chunks_by_dev:369 with ops.colocate_with(x[0]):370 output.append(array_ops.concat(x, 0))371 return output372def build_recursive_hd_all_reduce(input_tensors, red_op, un_op=None):373 """Construct a subgraph for recursive halving-doubling all-reduce.374 The recursive halving-doubling algorithm is described in375 http://www.mcs.anl.gov/~thakur/papers/ijhpca-coll.pdf376 The concept is to arrange the participating n devices in377 a linear sequence where devices exchange data pairwise378 with one other device in each round. During the gather379 phase there are lg(n) rounds where devices exchange380 increasingly smaller sub-tensors with another device381 at increasingly greater distances, until at the top382 each device has 1/n of the fully reduced values. During the383 scatter phase each device exchanges its fully reduced384 sub-tensor (which doubles in length at each round)385 with one other device at increasingly smaller distances386 until each device has all of the fully reduced values.387 Note: this preliminary version requires that len(input_tensors) be a388 power of 2. TODO(tucker): relax this restriction. Also, the389 number of elements in each tensor must be divisible by 2^h where h390 is the number of hops in each phase. This will also be relaxed in391 the future with edge-case specific logic.392 Args:393 input_tensors: list of T @{tf.Tensor} to be elementwise reduced.394 red_op: a binary elementwise reduction Op.395 un_op: an optional unary elementwise Op to apply to reduced values.396 Returns:397 list of T @{tf.Tensor} which are the fully reduced tensors, one398 at each device of input_tensors.399 Raises:400 ValueError: num_devices not a power of 2, or tensor len not divisible401 by 2 the proper number of times.402 """403 devices = [t.device for t in input_tensors]404 input_tensors, shape = _flatten_tensors(input_tensors)405 reduced_shards = _build_recursive_hd_gather(input_tensors, devices, red_op)406 if un_op:407 reduced_shards = [un_op(t) for t in reduced_shards]408 output_tensors = _build_recursive_hd_scatter(reduced_shards, devices)409 if len(shape) != 1:410 output_tensors = _reshape_tensors(output_tensors, shape)411 return output_tensors412def _build_recursive_hd_gather(input_tensors, devices, red_op):413 """Construct the gather phase of recursive halving-doubling all-reduce.414 Args:415 input_tensors: list of T @{tf.Tensor} to be elementwise reduced.416 devices: a list of strings naming the devices hosting input_tensors,417 which will also be used to host the (partial) reduction values.418 red_op: a binary elementwise reduction Op.419 Returns:420 list of T @{tf.Tensor} which are the fully reduced tensor shards.421 Raises:422 ValueError: num_devices not a power of 2, or tensor len not divisible423 by 2 the proper number of times.424 """425 num_devices = len(devices)426 num_hops = int(math.log(num_devices, 2))427 if num_devices != (2 ** num_hops):428 raise ValueError("num_devices must be a power of 2")429 chunks = input_tensors430 for h in range(0, num_hops):431 span = 2 ** h432 group_size = span * 2433 new_chunks = [[] for _ in devices]434 for d in range(0, num_devices):435 if (d % group_size) >= (group_size / 2):436 # skip right half of a pair437 continue438 left_dev = devices[d]439 right_dev = devices[d + span]440 left_split = array_ops.split(chunks[d], 2)441 right_split = array_ops.split(chunks[d+span], 2)442 with ops.device(left_dev):443 new_chunks[d] = red_op(left_split[0], right_split[0])444 with ops.device(right_dev):445 new_chunks[d + span] = red_op(left_split[1], right_split[1])446 chunks = new_chunks447 return chunks448def _build_recursive_hd_scatter(input_tensors, devices):449 """Construct the scatter phase of recursive halving-doublng all-reduce.450 Args:451 input_tensors: list of T @{tf.Tensor} that are fully-reduced shards.452 devices: a list of strings naming the devices on which the reconstituted453 full tensors should be placed.454 Returns:455 list of T @{tf.Tensor} which are the fully reduced tensors.456 """457 num_devices = len(devices)458 num_hops = int(math.log(num_devices, 2))459 assert num_devices == (2 ** num_hops), "num_devices must be a power of 2"460 chunks = input_tensors461 for h in reversed(range(0, num_hops)):462 span = 2 ** h463 group_size = span * 2464 new_chunks = [[] for _ in devices]465 for d in range(0, num_devices):466 if (d % group_size) >= (group_size / 2):467 # skip right half of a pair468 continue469 left_idx = d470 right_idx = d + span471 left_dev = devices[left_idx]472 right_dev = devices[right_idx]473 with ops.device(left_dev):474 new_chunks[left_idx] = array_ops.concat([chunks[left_idx],475 chunks[right_idx]], 0)476 with ops.device(right_dev):477 new_chunks[right_idx] = array_ops.concat([chunks[left_idx],478 chunks[right_idx]], 0)479 chunks = new_chunks480 return chunks481def build_shuffle_all_reduce(input_tensors, gather_devices, red_op, un_op=None):482 """Construct a subgraph for shuffle all-reduce.483 Shuffle reduce is essentially the algorithm implemented when using484 parameter servers. Suppose tensor length is n, there are d devices485 and g gather shards. Each device sends a n/g length sub-tensor to486 each gather shard. The gather shards perform a reduction across d487 fragments, then broadcast the result back to each device. The488 devices then join the g fully reduced fragments they receive from489 the shards. The gather shards could perform d-1 pairwise490 reductions, or one d-way reduction. The first is better where491 reduction Op time is low compared to transmission time, the second492 better in the other case.493 Args:494 input_tensors: list of T @(tf.Tensor} values to be reduced.495 gather_devices: list of names of devices on which reduction shards496 should be placed.497 red_op: an n-array elementwise reduction Op498 un_op: optional elementwise unary Op to be applied to fully-reduced values.499 Returns:500 list of T @{tf.Tensor} which are the fully reduced tensors.501 """502 input_tensors, shape = _flatten_tensors(input_tensors)503 dst_devices = [t.device for t in input_tensors]504 reduced_shards = _build_shuffle_gather(input_tensors, gather_devices,505 red_op, un_op)506 output_tensors = _build_shuffle_scatter(reduced_shards, dst_devices)507 if len(shape) != 1:508 output_tensors = _reshape_tensors(output_tensors, shape)509 return output_tensors510def _build_shuffle_gather(input_tensors, gather_devices, red_op, un_op=None):511 """Construct the gather (concentrate and reduce) phase of shuffle all-reduce.512 Args:513 input_tensors: list of T @(tf.Tensor} values to be reduced.514 gather_devices: list of names of devices on which reduction shards515 should be placed.516 red_op: the binary reduction Op517 un_op: optional elementwise unary Op to be applied to fully-reduced values.518 Returns:519 list of T @{tf.Tensor} which are the fully reduced shards.520 Raises:521 ValueError: inputs not well-formed.522 """523 num_source_devices = len(input_tensors)524 num_gather_devices = len(gather_devices)525 shape = input_tensors[0].shape526 if len(shape) != 1:527 raise ValueError("input_tensors must be 1D")528 shards_by_source = []529 for d in range(0, num_source_devices):530 with ops.colocate_with(input_tensors[d]):531 shards_by_source.append(532 _ragged_split(input_tensors[d], num_gather_devices))533 reduced_shards = []534 for d in range(0, num_gather_devices):535 with ops.device(gather_devices[d]):536 values = [s[d] for s in shards_by_source]537 red_shard = red_op(values)538 if un_op:539 red_shard = un_op(red_shard)540 reduced_shards.append(red_shard)541 return reduced_shards542def _build_shuffle_scatter(reduced_shards, dst_devices):543 """Build the scatter phase of shuffle all-reduce.544 Args:545 reduced_shards: list of T @(tf.Tensor} fully reduced shards546 dst_devices: list of names of devices at which the fully-reduced value547 should be reconstituted.548 Returns:549 list of T @{tf.Tensor} scattered tensors.550 """551 num_devices = len(dst_devices)552 out_tensors = []553 for d in range(0, num_devices):554 with ops.device(dst_devices[d]):555 out_tensors.append(array_ops.concat(reduced_shards, 0))556 return out_tensors557def _split_by_task(devices, values):558 """Partition devices and values by common task.559 Args:560 devices: list of device name strings561 values: list of T @{tf.tensor} of same length as devices.562 Returns:563 (per_task_devices, per_task_values) where both values are564 lists of lists with isomorphic structure: the outer list is565 indexed by task, and the inner list has length of the number566 of values belonging to that task. per_task_devices contains567 the specific devices to which the values are local, and568 per_task_values contains the corresponding values.569 Raises:570 ValueError: devices must be same length as values.571 """572 num_devices = len(devices)573 if num_devices != len(values):574 raise ValueError("len(devices) must equal len(values)")575 per_task_devices = collections.OrderedDict()576 per_task_values = collections.OrderedDict()577 for d in range(num_devices):578 d_spec = device_lib.DeviceSpec.from_string(devices[d])579 if not hasattr(d_spec, "task") or d_spec.task is None:580 assert False, "failed to parse device %s" % devices[d]581 index = (d_spec.job or "localhost", d_spec.replica or 0, d_spec.task)582 if index not in per_task_devices:583 per_task_devices[index] = []584 per_task_values[index] = []585 per_task_devices[index].append(devices[d])586 per_task_values[index].append(values[d])587 return (list(per_task_devices.values()), list(per_task_values.values()))588def build_nccl_all_reduce(input_tensors, red_op, un_op=None):589 """Build a subgraph that does one full all-reduce, using NCCL.590 Args:591 input_tensors: list of T @{tf.Tensor} of same-shape and type values to592 be reduced.593 red_op: binary elementwise reduction operator. Must be one of594 {tf.add}595 un_op: optional unary elementwise Op to apply to fully-reduce values.596 Returns:597 list of T @{tf.Tensor} of reduced values.598 Raises:599 ValueError: red_op not supported.600 """601 if red_op == math_ops.add:602 output_tensors = nccl.all_sum(input_tensors)603 else:604 raise ValueError("red_op not supported by NCCL all-reduce: ", red_op)605 if un_op:606 un_op_wrapped = []607 for t in output_tensors:608 with ops.colocate_with(t):609 un_op_wrapped.append(un_op(t))610 output_tensors = un_op_wrapped611 return output_tensors612def _build_nccl_hybrid(input_tensors, red_op, upper_level_f):613 """Construct a subgraph for NCCL hybrid all-reduce.614 Args:615 input_tensors: list of T @{tf.Tensor} of same-shape and type values to616 be reduced.617 red_op: binary elementwise reduction operator.618 upper_level_f: function for reducing one value per worker, across619 workers.620 Returns:621 list of T @{tf.Tensor} of reduced values.622 Raises:623 ValueError: inputs not well-formed.624 """625 input_tensors, shape = _flatten_tensors(input_tensors)626 devices = [t.device for t in input_tensors]627 per_worker_devices, per_worker_values = _split_by_task(devices, input_tensors)628 num_workers = len(per_worker_devices)629 up_values = [None for w in range(0, num_workers)]630 up_devices = up_values[:]631 down_values = up_values[:]632 # First stage: reduce within each worker using NCCL633 for w in range(0, num_workers):634 worker_values = build_nccl_all_reduce(per_worker_values[w], red_op)635 # NOTE: these reductions will not run to completion unless636 # every output value is used. Since we only need one, we637 # need to put control dependencies on the rest.638 with ops.control_dependencies(worker_values):639 with ops.device(worker_values[0].device):640 up_values[w] = array_ops.identity(worker_values[0])641 up_devices[w] = per_worker_devices[w][0]642 # Second stage: Apply upper_level_f to reduce across first device at643 # each worker644 level_2_output = upper_level_f(up_values)645 # Third stage: propagate within each worker using NCCL Broadcast646 for w in range(0, num_workers):647 dst_tensors = []648 with ops.device(per_worker_devices[w][0]):649 broadcast_src = nccl.broadcast(array_ops.identity(level_2_output[w]))650 for d in per_worker_devices[w]:651 with ops.device(d):652 dst_tensors.append(array_ops.identity(broadcast_src))653 down_values[w] = dst_tensors654 output_tensors = [v for sublist in down_values for v in sublist]655 if len(shape) != 1:656 output_tensors = _reshape_tensors(output_tensors, shape)657 return output_tensors658def _reduce_non_singleton(input_tensors, red_f, un_op):659 """If input_tensors has more than one element apply red_f, else apply un_op."""660 if len(input_tensors) > 1:661 return red_f(input_tensors)662 else:663 if not un_op:664 return input_tensors665 output_tensors = []666 for t in input_tensors:667 with ops.colocate_with(t):668 output_tensors.append(un_op(t))669 return output_tensors670def build_nccl_then_ring(input_tensors, subdiv, red_op, un_op=None):671 """Construct hybrid of NCCL within workers, Ring across workers."""672 def upper_builder(y):673 return build_ring_all_reduce(y, len(y), subdiv, [0], red_op, un_op)674 def upper_level_f(x):675 return _reduce_non_singleton(x, upper_builder, un_op)676 return _build_nccl_hybrid(input_tensors, red_op, upper_level_f)677def build_nccl_then_recursive_hd(input_tensors, red_op, un_op=None):678 """Construct hybrid of NCCL within workers, Recursive-HD across workers."""679 upper_level_f = lambda x: build_recursive_hd_all_reduce(x, red_op, un_op)680 return _build_nccl_hybrid(input_tensors, red_op, upper_level_f)681def build_nccl_then_shuffle(input_tensors, gather_devices, nccl_red_op,682 shuffle_red_op, un_op=None):683 """Construct hybrid of NCCL within workers, Shuffle across workers."""684 upper_level_f = lambda x: build_shuffle_all_reduce(x, gather_devices,685 shuffle_red_op, un_op)686 return _build_nccl_hybrid(input_tensors, nccl_red_op, upper_level_f)687def _build_shuffle_hybrid(input_tensors, gather_devices, red_op, upper_level_f):688 """Construct a subgraph for Shuffle hybrid all-reduce.689 Args:690 input_tensors: list of T @{tf.Tensor} of same-shape and type values to691 be reduced.692 gather_devices: list of device names on which to host gather shards.693 red_op: binary elementwise reduction operator.694 upper_level_f: function for reducing one value per worker, across695 workers.696 Returns:697 list of T @{tf.Tensor} of reduced values.698 Raises:699 ValueError: inputs not well-formed.700 """701 input_tensors, shape = _flatten_tensors(input_tensors)702 # First stage, reduce across each worker using gather_devices.703 devices = [t.device for t in input_tensors]704 per_worker_devices, per_worker_values = _split_by_task(devices, input_tensors)705 num_workers = len(per_worker_devices)706 up_values = []707 if len(gather_devices) != num_workers:708 raise ValueError("For shuffle hybrid, gather_devices must contain one "709 "device per worker. ")710 for w in range(0, num_workers):711 reduced_shards = _build_shuffle_gather(712 per_worker_values[w], [gather_devices[w]], red_op)713 up_values.append(reduced_shards[0])714 # Second stage, apply upper_level_f.715 level_2_output = upper_level_f(up_values)716 # Third stage, apply shuffle scatter at each worker.717 output_tensors = []718 for w in range(0, num_workers):719 output_tensors += _build_shuffle_scatter(720 [level_2_output[w]], per_worker_devices[w])721 if len(shape) != 1:722 output_tensors = _reshape_tensors(output_tensors, shape)723 return output_tensors724def build_shuffle_then_ring(input_tensors, gather_devices, subdiv,725 red_n_op, red_op, un_op=None):726 """Construct hybrid of Shuffle within workers, Ring across workers."""727 def upper_builder(tensors):728 return build_ring_all_reduce(tensors, len(tensors), subdiv, [0],729 red_op, un_op)730 def upper_level_f(tensors):731 return _reduce_non_singleton(tensors, upper_builder, un_op)732 return _build_shuffle_hybrid(733 input_tensors, gather_devices, red_n_op, upper_level_f)734def build_shuffle_then_shuffle(input_tensors, first_gather_devices,735 second_gather_devices, red_op, un_op=None):736 """Construct hybrid of Shuffle within workers, Shuffle across workers."""737 def upper_builder(tensors):738 return build_shuffle_all_reduce(tensors, second_gather_devices,739 red_op, un_op)740 def upper_level_f(tensors):741 return _reduce_non_singleton(tensors, upper_builder, un_op)742 return _build_shuffle_hybrid(...
app.py
Source:app.py
1import paho.mqtt.client as mqtt2from flask import Flask,g,render_template,request,Response3from flask_cors import CORS4from flask_socketio import SocketIO, emit5from flask_apscheduler import APScheduler6from libDB import logDB,clearOldDB,queryDB7import numpy as np8import json9from libPickle import *10from pyimagesearch.motion_detection import SingleMotionDetector11from imutils.video import VideoStream12import threading13import argparse14import datetime15import imutils16import schedule17import time18import cv219#edit by chrome20"""21Date storage22- hot : pickle devices data23- cold :24- frozen : sqlite325""" 26class Config(object):27 SCHEDULER_API_ENABLED = True28 SECRET_KEY = 'secret!'29 threaded=True30 31scheduler = APScheduler()32# initialize the output frame and a lock used to ensure thread-safe33# exchanges of the output frames (useful for multiple browsers/tabs34# are viewing tthe stream)35outputFrame = None36lock = threading.Lock()37app = Flask(__name__)38app.config.from_object(Config())39CORS(app)40socketio = SocketIO(app)41# initialize the video stream and allow the camera sensor to42# warmup43#vs = VideoStream(usePiCamera=1).start()44vs = VideoStream(src=0).start()45# Create a dictionary called devices to store the device number, name, and device state:46devices = {47 'one' : {'floor' : 'fl1','position' : 'garage','object' : 'door', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/garage/door/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},48 'two' : {'floor' : 'fl1','position' : 'garage','object' : 'door', 'cmd' : '02', 'state' : 'False', 'type' : 'latch', 'topic' : 'fl1/garage/door/02', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},49 'three' : {'floor' : 'fl1','position' : 'com','object' : 'light', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/com/light/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},50 'four' : {'floor' : 'fl1','position' : 'com','object' : 'light', 'cmd' : '02', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl1/com/light/02', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},51 'five' : {'floor' : 'fl2','position' : 'liv','object' : 'fan', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl2/liv/fan/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},52 'six' : {'floor' : 'fl2','position' : 'cen','object' : 'light', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl2/cen/light/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'},53 'seven' : {'floor' : 'fl3','position' : 'bed','object' : 'fan', 'cmd' : '01', 'state' : 'False', 'type' : 'on/off', 'topic' : 'fl3/bed/fan/01', 'temp' : '0', 'humid' : '0', 'th_enable' : 'False', 'time_enable' : 'False'}54 }55try:56 devices = load_pickle_obj('devices')57 # print(devices)58 print("devices exist")59except:60 save_pickle_obj(devices,'devices')61sensors = {62 'one' : {'name' : 'sensorName1'}63 }64jobCron = { 'startHour':'14',65 'startMinute':'00',66 'stopHour':'14',67 'stopMinute':'30'68 }69try:70 jobCron = load_pickle_obj('jobCron')71 # print(jobCron)72 print("jobCron exist")73except:74 save_pickle_obj(jobCron,'jobCron')75queryDatas = {'temp':{},76 'timeStamp':{},77 'humid':{}}78# Put the device dictionary into the template data dictionary:79templateData = {80 'devices' : devices,81 'sensors' : sensors,82 'jobCron': jobCron,83 'queryDatas':queryDatas84 }85@scheduler.task('cron', id='do_job_on', hour=jobCron['startHour'], minute=jobCron['startMinute'])86def jobOn():87 print('Job on executed')88 for device in devices:89 if devices[device]['type'] == 'on/off':90 if devices[device]['time_enable'] == 'True':91 devices[device]['state'] = 'True'92 mqtt_client.publish(devices[device]['topic'],"on")93 print(device+' '+devices[device]['time_enable'])94 try:95 save_pickle_obj(devices,'devices')96 except:97 print("error")98@scheduler.task('cron', id='do_job_off', hour=jobCron['stopHour'], minute=jobCron['stopMinute'])99def jobOff():100 print('Job off executed')101 for device in devices:102 if devices[device]['type'] == 'on/off':103 if devices[device]['time_enable'] == 'True':104 devices[device]['state'] = 'False'105 mqtt_client.publish(devices[device]['topic'],"off")106 print(device+' '+devices[device]['time_enable'])107 try:108 save_pickle_obj(devices,'devices')109 except:110 print("error")111@scheduler.task('cron', id='do_job_DB', minute='*')112def jobDB():113 print('Job DB executed')114 for device in devices:115 if devices[device]['th_enable'] == 'True':116 logDB(devices[device]['topic'],devices[device]['temp'],devices[device]['humid'],"DHT","true")117 try:118 save_pickle_obj(devices,'devices')119 except:120 print("error")121 122@scheduler.task('cron', id='do_job_ClrDB', day='*')123def jobClrDB():124 print('Job ClrDB executed')125 clearOldDB()126scheduler.init_app(app)127scheduler.start()128# The callback for when the client receives a CONNACK response from the server.129def on_connect(client, userdata, flags, rc):130 print("Connected with result code "+str(rc))131 # Subscribing in on_connect() means that if we lose the connection and132 # reconnect then subscriptions will be renewed.133 client.subscribe("fl1/#")# + single-level wild card, # multi-level wild card, 134 client.subscribe("fl2/#")135 client.subscribe("fl3/#")136 client.subscribe("#")137 138def whx(ownval,owndicts):139 for x in owndicts:140 if (ownval) in owndicts[x].values():141 return x142 return 'none'143# The callback for when a PUBLISH message is received from the ESP8266.144def on_message(client, userdata, message):145 # socketio.emit('my variable')146 print("Received message '" + str(message.payload.decode('utf8')) + "' on topic '"+ message.topic + "' with QoS " + str(message.qos))147 if message.topic.startswith('fl'):148 topicMsgSplit = message.topic.split("/")149 slash_count = message.topic.count("/")150 if slash_count>=3:151 topicMsg = topicMsgSplit[0]+"/"+topicMsgSplit[1]+"/"+topicMsgSplit[2]+"/"+topicMsgSplit[3]152 indexName = whx(topicMsg,devices)153 devices_list = list(devices)154 n_index = devices_list.index(indexName)155 if "/temperature" in message.topic:156 devices[indexName]['th_enable'] = 'True'157 devices[indexName]['temp'] = str(message.payload.decode('utf8'))158 socketio.emit('temp' , {'data': devices[indexName]['temp'],'pos':n_index})159 #print("temperature update")160 if "/humidity" in message.topic:161 devices[indexName]['th_enable'] = 'True'162 devices[indexName]['humid'] = str(message.payload.decode('utf8'))163 socketio.emit('humid' , {'data': devices[indexName]['humid'],'pos':n_index})164 # print("humidity update") 165 if "/feedback" in message.topic:166 if "on" in str(message.payload.decode('utf8')):167 if devices[indexName]['state'] == 'False':168 devices[indexName]['state'] = 'True'169 socketio.emit('refresh' , {})170 # print("refresh")171 172 else:173 if devices[indexName]['state'] == 'True':174 devices[indexName]['state'] = 'False'175 socketio.emit('refresh' , {})176 # print("refresh")177 # print("feedback update")178 else:179 print("mqtt message.topic error")180 elif message.topic.startswith('tele'):181 #print("tasmota")182 topicMsgSplit = message.topic.split("/")#[0]:tele,[1]:name,[2]:classify183 #topicMsg = topicMsgSplit[0]+"/"+topicMsgSplit[1]+"/"+topicMsgSplit[2]184 if topicMsgSplit[2] == "SENSOR":185 #print(topicMsgSplit[1])186 topic_json=json.dumps(str(message.payload.decode('utf8')))187 print(topic_json["ENERGY"])188 189 else:190 print("unknown topic: "+message.topic)191 192 try:193 save_pickle_obj(devices,'devices')194 except:195 print("error")196 197mqtt_client = mqtt.Client()198mqtt_client.on_connect = on_connect199mqtt_client.on_message = on_message200mqtt_client.connect("127.0.0.1",1883,60)201# mqtt_client.connect("192.168.2.46",1883,60)202mqtt_client.loop_start()203@app.route("/",methods=['GET', 'POST'])204def main():205 if request.method == 'GET':206 queryDatas['temp'] = queryDB("temp")207 queryDatas['humid'] = queryDB("humid")208 queryDatas['timeStamp'] = queryDB("timeStamp")209 # print(queryDatas['timeStamp'])210 templateData = {211 'devices' : devices,212 'sensors' : sensors,213 'jobCron': jobCron,214 'queryDatas':queryDatas215 }216 try:217 save_pickle_obj(devices,'devices')218 except:219 print("error")220 # Pass the template data into the template main.html and return it to the user221 return render_template('main.html', async_mode=socketio.async_mode, **templateData)222 elif request.method == 'POST':223 return 'post method do nothing'224 else:225 return 'method error'226@app.route("/debug")227def debug():228 templateData = {229 'devices' : devices,230 'sensors' : sensors,231 'jobCron': jobCron,232 'queryDatas':queryDatas233 }234 # Pass the template data into the template main.html and return it to the user235 return render_template('debug.html', async_mode=socketio.async_mode, **templateData)236# The function below is executed when someone requests a URL with the device number and action in it:237@app.route("/<device>/<floor>/<position>/<object>/<cmd>/<ctrl>",methods = ['POST', 'GET'])238def action(device, floor, position, object, cmd, ctrl):239 # Convert the device from the URL into an integer:240 #function = function#int(function)241 # Get the device name for the device being changed:242 # deviceName = devices[function]['name']243 # If the action part of the URL is "1" execute the code indented below:244 if ctrl == "on":245 mqtt_client.publish(devices[device]['topic'],"on")246 #print('mp '+devices[device]['topic'])247 devices[device]['state'] = 'True'248 # if ctrl == "0" and object == 'door':249 if ctrl == "off":250 mqtt_client.publish(devices[device]['topic'],"off")251 #print('mp '+devices[device]['topic'])252 devices[device]['state'] = 'False'253 if ctrl == "toggle":254 if devices[device]['state'] == 'True':255 mqtt_client.publish(devices[device]['topic'],"off")256 #print('mp '+devices[device]['topic'])257 devices[device]['state'] = 'False'258 else: 259 mqtt_client.publish(devices[device]['topic'],"on")260 #print('mp '+devices[device]['topic'])261 devices[device]['state'] = 'True'262 if ctrl == "click":263 mqtt_client.publish(devices[device]['topic'],"click")264 print('click '+devices[device]['topic'])265 devices[device]['state'] = 'False'266 267 # Along with the device dictionary, put the message into the template data dictionary:268 templateData = {269 'devices' : devices,270 'sensors' : sensors,271 'jobCron': jobCron,272 'queryDatas':queryDatas273 }274 try:275 save_pickle_obj(devices,'devices')276 except:277 print("error")278 return render_template('main.html', **templateData)279@app.route("/addSched/<device>")280def addSched(device):281 print('time_enable '+devices[device]['time_enable'])282 devices[device]['time_enable'] = 'True'283 templateData = {284 'devices' : devices,285 'sensors' : sensors,286 'jobCron': jobCron,287 'queryDatas':queryDatas288 }289 try:290 save_pickle_obj(devices,'devices')291 except:292 print("error")293 return render_template('main.html', **templateData)294 295 296@app.route("/rmSched/<device>")297def rmSched(device):298 print('time_enable '+devices[device]['time_enable'])299 devices[device]['time_enable'] = 'False'300 templateData = {301 'devices' : devices,302 'sensors' : sensors,303 'jobCron': jobCron,304 'queryDatas':queryDatas305 }306 try:307 save_pickle_obj(devices,'devices')308 except:309 print("error")310 return render_template('main.html', **templateData)311@app.route('/startTime',methods = ['POST', 'GET'])312def startTime():313 if request.method == 'POST':314 print(request.form)315 result1 = str(request.form['startTime1'])316 result2 = str(request.form['startTime2'])317 for job in scheduler.get_jobs():318 print(job.id)319 320 try:321 scheduler.scheduler.reschedule_job('do_job_on', trigger='cron', hour=result1, minute=result2)322 jobCron['startHour']=result1323 jobCron['startMinute']=result2324 except:325 pass326 templateData = {327 'devices' : devices,328 'sensors' : sensors,329 'jobCron': jobCron,330 'queryDatas':queryDatas331 }332 try:333 save_pickle_obj(devices,'devices')334 save_pickle_obj(jobCron,'jobCron')335 except:336 print("error")337 return render_template('main.html', **templateData)338 339@app.route('/stopTime',methods = ['POST', 'GET'])340def stopTime():341 if request.method == 'POST':342 result1 = str(request.form['stopTime1'])343 result2 = str(request.form['stopTime2'])344 for job in scheduler.get_jobs():345 print(job.id)346 347 try:348 scheduler.scheduler.reschedule_job('do_job_off', trigger='cron', hour=result1, minute=result2)349 jobCron['stopHour']=result1350 jobCron['stopMinute']=result2351 except:352 pass353 354 templateData = {355 'devices' : devices,356 'sensors' : sensors,357 'jobCron': jobCron,358 'queryDatas':queryDatas359 }360 try:361 save_pickle_obj(devices,'devices')362 save_pickle_obj(jobCron,'jobCron')363 except:364 print("error")365 return render_template('main.html', **templateData)366@app.route("/videoStream")367def videoStream():368 # return the rendered template369 return render_template("videoStream.html")370def detect_motion(frameCount):371 # grab global references to the video stream, output frame, and372 # lock variables373 global vs, outputFrame, lock374 # initialize the motion detector and the total number of frames375 # read thus far376 md = SingleMotionDetector(accumWeight=0.1)377 total = 0378 # loop over frames from the video stream379 while True:380 # read the next frame from the video stream, resize it,381 # convert the frame to grayscale, and blur it382 frame = vs.read()383 frame = imutils.resize(frame, width=400)384 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)385 gray = cv2.GaussianBlur(gray, (7, 7), 0)386 # grab the current timestamp and draw it on the frame387 timestamp = datetime.datetime.now()388 cv2.putText(frame, timestamp.strftime("%A %d %B %Y %I:%M:%S%p"), 389 (10, frame.shape[0] - 10),cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 255), 1)390 # if the total number of frames has reached a sufficient391 # number to construct a reasonable background model, then392 # continue to process the frame393 if total > frameCount:394 # detect motion in the image395 motion = md.detect(gray)396 # cehck to see if motion was found in the frame397 if motion is not None:398 # unpack the tuple and draw the box surrounding the399 # "motion area" on the output frame400 (thresh, (minX, minY, maxX, maxY)) = motion401 cv2.rectangle(frame, (minX, minY), (maxX, maxY),(0, 0, 255), 2)402 403 # update the background model and increment the total number404 # of frames read thus far405 md.update(gray)406 total += 1407 # acquire the lock, set the output frame, and release the408 # lock409 with lock:410 outputFrame = frame.copy()411 # outputFrame = gray.copy()412 413 414def generate():415 # grab global references to the output frame and lock variables416 global outputFrame, lock417 # loop over frames from the output stream418 while True:419 # wait until the lock is acquired420 with lock:421 # check if the output frame is available, otherwise skip422 # the iteration of the loop423 if outputFrame is None:424 continue425 # encode the frame in JPEG format426 (flag, encodedImage) = cv2.imencode(".jpg", outputFrame)427 # ensure the frame was successfully encoded428 if not flag:429 continue430 # yield the output frame in the byte format431 yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encodedImage) + b'\r\n')432@app.route("/video_feed")433def video_feed():434 # return the response generated along with the specific media435 # type (mime type)436 return Response(generate(),mimetype = "multipart/x-mixed-replace; boundary=frame")437 438@socketio.on('my event')439def handle_my_custom_event(json):440 #print('received json data here: ' + str(json))441 pass442# function for responses443def results():444 # build a request object445 req = request.get_json(silent=True, force=True)446 print("Request:")447 print(json.dumps(req, indent=4))448 # print(req,flush=True)449 # fetch fulfillmentText from json450 fulfillmentText = req.get('queryResult').get('fulfillmentText')451 # print(req,flush=True)452 # print(fulfillmentText, flush=True)453 mqtt_client.publish("fl1/garage/door/02","slide")454 # return a fulfillment response455 #return {'fulfillmentText': 'This is a response from webhook.'}456 return 'results'457@app.route('/webhook',methods=['GET','POST'])458def webhook():459 if request.method == 'POST':460 return results()461 else:462 return 'Hello'463if __name__ == "__main__":464 t = threading.Thread(target=detect_motion, args=(64,))465 t.daemon = True466 t.start()467 socketio.run(app, host='0.0.0.0', port=80, debug=True, use_reloader=False)468 469# release the video stream pointer...
path.py
Source:path.py
...158 List of devices ordered by coordinates159 """160 return sorted(self.devices, key=lambda dev: dev.md.z)161 @property162 def blocking_devices(self):163 """164 A list of devices that are currently inserted or are in unknown165 positions. This includes devices downstream of the first166 :attr:`.impediment`167 """168 # Cache important prior devices169 prior = None170 last_branches = list()171 block = list()172 for device in self.path:173 # If we have switched beamlines174 if prior and device.md.beamline != prior.md.beamline:175 # Find improperly configured optics176 for optic in last_branches:177 # If this optic is responsible for delivering beam178 # to this hutch and it is not configured to do so.179 # Mark it as blocking180 if device.md.beamline in optic.branches:181 if device.md.beamline not in optic.destination:182 block.append(optic)183 # Otherwise ensure it is removed from the beamline184 elif optic.md.beamline not in optic.destination:185 block.append(optic)186 # Clear optics that have been evaluated187 last_branches.clear()188 # If our last device was an optic, make sure it wasn't required189 # to continue along this beampath190 elif (prior in last_branches191 and device.md.beamline in prior.branches192 and device.md.beamline not in prior.destination):193 block.append(last_branches.pop(-1))194 # Find branching devices and store195 # They will be marked as blocking by downstream devices196 dev_state = find_device_state(device)197 if device in self.branches:198 last_branches.append(device)199 # Find inserted devices200 elif dev_state == DeviceState.Inserted:201 # Ignore devices with low enough transmssion202 trans = getattr(device, 'transmission', 1)203 if trans < self.minimum_transmission:204 block.append(device)205 # Find unknown and faulted devices206 elif dev_state != DeviceState.Removed:207 block.append(device)208 # Stache our prior device209 prior = device210 return block211 @property212 def incident_devices(self):213 """214 A list of devices the beam is currently incident on. This includes the215 current :attr:`.impediment` and any upstream devices that may be216 inserted but have more transmission than :attr:`.minimum_transmission`217 """218 # Find device information219 inserted = [d for d in self.path220 if find_device_state(d) == DeviceState.Inserted]221 impediment = self.impediment222 # No blocking devices, all inserted devices incident223 if not impediment:224 return inserted225 # Otherwise only return upstream of the impediment226 return [d for d in inserted if d.md.z <= impediment.md.z]227 def show_devices(self, file=None):228 """229 Print a table of the devices along the beamline230 Parameters231 ----------232 file : file-like object233 File to writable234 """235 # Initialize Table236 pt = PrettyTable(['Name', 'Prefix', 'Position', 'Beamline', 'State'])237 # Adjust Table settings238 pt.align = 'r'239 pt.align['Name'] = 'l'240 pt.align['Prefix'] = 'l'241 pt.float_format = '8.5'...
vscsi_util.py
Source:vscsi_util.py
...136 devname = sg137 scsi_id = _vscsi_get_scsiid(sg)138 devices.append([hctl, devname, sg, scsi_id])139 return devices140def vscsi_get_scsidevices(mask="*"):141 """ get all scsi devices information """142 devices = _vscsi_get_scsidevices_by_lsscsi("[%s]" % mask)143 if devices or (len(mask) and mask[0] != "*"):144 # devices found or partial device scan145 return devices146 return _vscsi_get_scsidevices_by_sysfs()147def vscsi_get_hctl_and_devname_by(target, scsi_devices = None):148 if target.startswith('/dev/'):149 target = os.path.realpath(target)150 if scsi_devices is None:151 if len(target.split(':')) == 4:152 scsi_devices = _vscsi_get_scsidevices_by_lsscsi(target)153 elif target.startswith('/dev/'): 154 scsi_devices = _vscsi_get_scsidevices_by_lsscsi("| grep %s" % target)155 else:156 scsi_devices = _vscsi_get_scsidevices_by_lsscsi("")157 if not scsi_devices:158 scsi_devices = _vscsi_get_scsidevices_by_sysfs()159 if len(target.split(':')) == 4:160 return _vscsi_get_devname_by(target, scsi_devices)161 else:162 return _vscsi_get_hctl_by(target, scsi_devices)163def get_scsi_vendor(pHCTL):164 try:165 sysfs_mnt = utils.find_sysfs_mount() 166 sysfs_scsi_dev_path = \167 os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)168 scsi_vendor = \169 os.popen('cat ' + sysfs_scsi_dev_path + \170 SYSFS_SCSI_DEV_VENDOR_PATH).read()171 return scsi_vendor.splitlines()[0]172 except:173 return None174def get_scsi_model(pHCTL):175 try:176 sysfs_mnt = utils.find_sysfs_mount() 177 sysfs_scsi_dev_path = \178 os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)179 scsi_model = \180 os.popen('cat ' + sysfs_scsi_dev_path + \181 SYSFS_SCSI_DEV_MODEL_PATH).read()182 return scsi_model.splitlines()[0]183 except:184 return None185def get_scsi_typeid(pHCTL):186 try:187 sysfs_mnt = utils.find_sysfs_mount() 188 sysfs_scsi_dev_path = \189 os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)190 scsi_typeid = \191 os.popen('cat ' + sysfs_scsi_dev_path + \192 SYSFS_SCSI_DEV_TYPEID_PATH).read()193 return int(scsi_typeid.splitlines()[0])194 except:195 return None196def get_scsi_revision(pHCTL):197 try:198 sysfs_mnt = utils.find_sysfs_mount() 199 sysfs_scsi_dev_path = \200 os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)201 scsi_revision = \202 os.popen('cat ' + sysfs_scsi_dev_path + \203 SYSFS_SCSI_DEV_REVISION_PATH).read()204 return scsi_revision.splitlines()[0]205 except:206 return None207def get_scsi_scsilevel(pHCTL):208 try:209 sysfs_mnt = utils.find_sysfs_mount() 210 sysfs_scsi_dev_path = \211 os.path.join(sysfs_mnt + SYSFS_SCSI_PATH, pHCTL)212 scsi_scsilevel = \213 os.popen('cat ' + sysfs_scsi_dev_path + \214 SYSFS_SCSI_DEV_SCSILEVEL_PATH).read()215 return int(scsi_scsilevel.splitlines()[0])216 except:217 return None218def _make_scsi_record(scsi_info):219 scsi_rec = {220 'physical_HCTL': scsi_info[0],221 'dev_name': None,222 'sg_name': scsi_info[2],223 'scsi_id': None224 }225 if scsi_info[1] is not None:226 scsi_rec['dev_name'] = scsi_info[1] 227 if scsi_info[3] is not None:228 scsi_rec['scsi_id'] = scsi_info[3] 229 scsi_rec['vendor_name'] = \230 get_scsi_vendor(scsi_rec['physical_HCTL'])231 scsi_rec['model'] = \232 get_scsi_model(scsi_rec['physical_HCTL'])233 scsi_rec['type_id'] = \234 get_scsi_typeid(scsi_rec['physical_HCTL'])235 scsi_rec['revision'] = \236 get_scsi_revision(scsi_rec['physical_HCTL'])237 scsi_rec['scsi_level'] = \238 get_scsi_scsilevel(scsi_rec['physical_HCTL'])239 try:240 lsscsi_info = os.popen('lsscsi %s 2>/dev/null' % scsi_rec['physical_HCTL']).read().split()241 scsi_rec['type'] = lsscsi_info[1]242 except:243 scsi_rec['type'] = None244 return scsi_rec245def get_scsi_device(pHCTL):246 scsis_info = _vscsi_get_scsidevices_by_lsscsi(pHCTL)247 if not scsis_info:248 scsis_info = _vscsi_get_scsidevices_by_sysfs()249 for scsi_info in scsis_info:250 if scsi_info[0] == pHCTL:251 return _make_scsi_record(scsi_info)252 return None253def get_all_scsi_devices(mask="*"):254 scsi_records = []255 for scsi_info in vscsi_get_scsidevices(mask):256 scsi_record = _make_scsi_record(scsi_info)257 scsi_records.append(scsi_record)...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!