Best Python code snippet using robotframework
cli.py
Source:cli.py
...840 # final location in the project841 f_rel_in_proj = remap_filepath(f_rel)842 proj_base_b = os.path.dirname(f_rel_in_proj).encode("utf-8")843 from bam.blend import blendfile_pack_restore844 blendfile_pack_restore.blendfile_remap(845 f_abs.encode('utf-8'),846 dir_remap.encode('utf-8'),847 deps_remap_cb=remap_cb,848 deps_remap_cb_userdata=proj_base_b,849 )850 return f_abs_remap851 for f_rel, f_abs in list(paths_modified.items()):852 if f_abs.endswith(".blend"):853 f_abs_remap = remap_file(f_rel, f_abs)854 if os.path.exists(f_abs_remap):855 paths_modified[f_rel] = f_abs_remap856 for f_rel, f_abs in list(paths_add.items()):857 if f_abs.endswith(".blend"):858 f_abs_remap = remap_file(f_rel, f_abs)859 if os.path.exists(f_abs_remap):860 paths_add[f_rel] = f_abs_remap861 """862 deps = deps_remap.get(f_rel)863 if deps:864 # ----865 # remap!866 f_abs_remap = os.path.join(basedir_temp, f_rel)867 dir_remap = os.path.dirname(f_abs_remap)868 os.makedirs(dir_remap, exist_ok=True)869 import blendfile_pack_restore870 blendfile_pack_restore.blendfile_remap(871 f_abs.encode('utf-8'),872 dir_remap.encode('utf-8'),873 deps,874 )875 if os.path.exists(f_abs_remap):876 f_abs = f_abs_remap877 paths_modified[f_rel] = f_abs878 """879 # -------------------------880 print("Now make a zipfile")881 import zipfile882 temp_zip = os.path.join(session_rootdir, ".bam_tmp.zip")883 with zipfile.ZipFile(temp_zip, 'w', zipfile.ZIP_DEFLATED) as zip_handle:884 for paths_dict, op in ((paths_modified, 'M'), (paths_add, 'A')):885 for (f_rel, f_abs) in paths_dict.items():886 print(" packing (%s): %r" % (op, f_abs))887 zip_handle.write(f_abs, arcname=f_rel)888 # make a paths remap that only includes modified files889 # TODO(cam), from 'packer.py'890 paths_remap_subset = {891 f_rel: f_rel_in_proj892 for f_rel, f_rel_in_proj in paths_remap.items() if f_rel in paths_modified}893 paths_remap_subset.update({894 f_rel: remap_filepath(f_rel)895 for f_rel in paths_add})896 # paths_remap_subset.update(paths_remap_subset_add)897 write_json_to_zip(zip_handle, ".bam_paths_remap.json", paths_remap_subset)898 # build a list of path manipulation operations899 paths_ops = {}900 # paths_remove ...901 for f_rel, f_abs in paths_remove.items():902 # TODO903 f_abs_remote = paths_remap[f_rel]904 paths_ops[f_abs_remote] = 'D'905 write_json_to_zip(zip_handle, ".bam_paths_ops.json", paths_ops)906 log.debug(paths_ops)907 # --------------908 # Commit Request909 payload = {910 "command": "commit",911 "arguments": json.dumps({912 'message': message,913 }),914 }915 files = {916 "file": open(temp_zip, 'rb'),917 }918 with files["file"]:919 r = requests.put(920 bam_session.request_url("file"),921 params=payload,922 auth=(cfg["user"], cfg["password"]),923 files=files)924 os.remove(temp_zip)925 try:926 r_json = r.json()927 print(r_json.get("message", "<empty>"))928 except Exception:929 print(r.text)930 # TODO, handle error cases931 ok = True932 if ok:933 # ----------934 # paths_uuid935 paths_uuid.update(paths_uuid_update)936 write_json_to_file(os.path.join(session_rootdir, ".bam_paths_uuid.json"), paths_uuid_update)937 # -----------938 # paths_remap939 paths_remap.update(paths_remap_subset)940 for k in paths_remove:941 del paths_remap[k]942 write_json_to_file(os.path.join(session_rootdir, ".bam_paths_remap.json"), paths_remap)943 del write_json_to_file944 # ------------------945 # Update Local Cache946 #947 # We now have 'pristine' files in basedir_temp, the commit went fine.948 # So move these into local cache AND we have to remake the binary_edit data.949 # since files were modified, if we don't do this - we wont be able to revert or avoid950 # re-downloading the files later.951 binary_edits_all_update = {}952 binary_edits_all_remove = set()953 for paths_dict, op in ((paths_modified, 'M'), (paths_add, 'A')):954 for f_rel, f_abs in paths_dict.items():955 print(" caching (%s): %r" % (op, f_abs))956 f_dst_abs = os.path.join(cachedir, f_rel)957 os.makedirs(os.path.dirname(f_dst_abs), exist_ok=True)958 if f_abs.startswith(basedir_temp):959 os.rename(f_abs, f_dst_abs)960 else:961 import shutil962 shutil.copyfile(f_abs, f_dst_abs)963 del shutil964 binary_edits = binary_edits_all_update[f_rel.encode('utf-8')] = []965 # update binary_edits966 if f_rel.endswith(".blend"):967 bam_session.binary_edits_update_single(968 f_dst_abs,969 binary_edits,970 remap_filepath_cb=remap_filepath_bytes,971 )972 for f_rel, f_abs in paths_remove.items():973 binary_edits_all_remove.add(f_rel)974 paths_edit_abs = os.path.join(session_rootdir, ".bam_paths_edit.data")975 if binary_edits_all_update or binary_edits_all_remove:976 if os.path.exists(paths_edit_abs):977 with open(paths_edit_abs, 'rb') as fh:978 import pickle979 binary_edits_all = pickle.load(fh)980 del pickle981 else:982 binary_edits_all = {}983 if binary_edits_all_remove and binary_edits_all:984 for f_rel in binary_edits_all_remove:985 if f_rel in binary_edits_all:986 try:987 del binary_edits_all[f_rel]988 except KeyError:989 pass990 if binary_edits_all_update:991 binary_edits_all.update(binary_edits_all_update)992 import pickle993 with open(paths_edit_abs, 'wb') as fh:994 print()995 pickle.dump(binary_edits_all, fh, pickle.HIGHEST_PROTOCOL)996 del binary_edits_all997 del paths_edit_abs998 del pickle999 # ------------------------------1000 # Cleanup temp dir to finish off1001 if os.path.exists(basedir_temp):1002 import shutil1003 shutil.rmtree(basedir_temp)1004 del shutil1005 @staticmethod1006 def status(paths, use_json=False):1007 # TODO(cam) multiple paths1008 path = paths[0]1009 del paths1010 session_rootdir = bam_config.find_sessiondir(path, abort=True)1011 paths_add, paths_remove, paths_modified = bam_session.status(session_rootdir)1012 if not use_json:1013 for f in sorted(paths_add):1014 print(" A: %s" % f)1015 for f in sorted(paths_modified):1016 print(" M: %s" % f)1017 for f in sorted(paths_remove):1018 print(" D: %s" % f)1019 else:1020 ret = []1021 for f in sorted(paths_add):1022 ret.append(("A", f))1023 for f in sorted(paths_modified):1024 ret.append(("M", f))1025 for f in sorted(paths_remove):1026 ret.append(("D", f))1027 print(json.dumps(ret))1028 @staticmethod1029 def list_dir(paths, use_full=False, use_json=False):1030 import requests1031 # Load project configuration1032 cfg = bam_config.load(abort=True)1033 # TODO(cam) multiple paths1034 path = paths[0]1035 del paths1036 payload = {1037 "path": path,1038 }1039 r = requests.get(1040 bam_session.request_url("file_list"),1041 params=payload,1042 auth=(cfg['user'], cfg['password']),1043 stream=True,1044 )1045 r_json = r.json()1046 items = r_json.get("items_list")1047 if items is None:1048 fatal(r_json.get("message", "<empty>"))1049 items.sort()1050 if use_json:1051 ret = []1052 for (name_short, name_full, file_type) in items:1053 ret.append((name_short, file_type))1054 print(json.dumps(ret))1055 else:1056 def strip_dot_slash(f):1057 return f[2:] if f.startswith("./") else f1058 for (name_short, name_full, file_type) in items:1059 if file_type == "dir":1060 print(" %s/" % (strip_dot_slash(name_full) if use_full else name_short))1061 for (name_short, name_full, file_type) in items:1062 if file_type != "dir":1063 print(" %s" % (strip_dot_slash(name_full) if use_full else name_short))1064 @staticmethod1065 def deps(paths, recursive=False, use_json=False):1066 def deps_path_walker():1067 from bam.blend import blendfile_path_walker1068 for blendfile_src in paths:1069 blendfile_src = blendfile_src.encode('utf-8')1070 yield from blendfile_path_walker.FilePath.visit_from_blend(1071 blendfile_src,1072 readonly=True,1073 recursive=recursive,1074 )1075 def status_walker():1076 for fp, (rootdir, fp_blend_basename) in deps_path_walker():1077 f_rel = fp.filepath1078 f_abs = fp.filepath_absolute1079 yield (1080 # blendfile-src1081 os.path.join(fp.basedir, fp_blend_basename).decode('utf-8'),1082 # fillepath-dst1083 f_rel.decode('utf-8'),1084 f_abs.decode('utf-8'),1085 # filepath-status1086 "OK" if os.path.exists(f_abs) else "MISSING FILE",1087 )1088 if use_json:1089 is_first = True1090 # print in parts, so we don't block the output1091 print("[")1092 for f_src, f_dst, f_dst_abs, f_status in status_walker():1093 if is_first:1094 is_first = False1095 else:1096 print(",")1097 print(json.dumps((f_src, f_dst, f_dst_abs, f_status)), end="")1098 print("]")1099 else:1100 for f_src, f_dst, f_dst_abs, f_status in status_walker():1101 print(" %r -> (%r = %r) %s" % (f_src, f_dst, f_dst_abs, f_status))1102 @staticmethod1103 def pack(1104 paths,1105 output,1106 mode,1107 repository_base_path=None,1108 all_deps=False,1109 use_quiet=False,1110 warn_remap_externals=False,1111 compress_level=-1,1112 filename_filter=None,1113 ):1114 # Local packing (don't use any project/session stuff)1115 from .blend import blendfile_pack1116 # TODO(cam) multiple paths1117 path = paths[0]1118 del paths1119 if output is None:1120 fatal("Output path must be given when packing with: --mode=FILE")1121 if os.path.isdir(output):1122 if mode == "ZIP":1123 output = os.path.join(output, os.path.splitext(path)[0] + ".zip")1124 else: # FILE1125 output = os.path.join(output, os.path.basename(path))1126 if use_quiet:1127 report = lambda msg: None1128 else:1129 report = lambda msg: print(msg, end="")1130 if repository_base_path is not None:1131 repository_base_path = repository_base_path.encode('utf-8')1132 # replace var with a pattern matching callback1133 filename_filter_cb = blendfile_pack.exclusion_filter(filename_filter)1134 for msg in blendfile_pack.pack(1135 path.encode('utf-8'),1136 output.encode('utf-8'),1137 mode=mode,1138 all_deps=all_deps,1139 repository_base_path=repository_base_path,1140 compress_level=compress_level,1141 report=report,1142 warn_remap_externals=warn_remap_externals,1143 use_variations=True,1144 filename_filter=filename_filter_cb,1145 ):1146 pass1147 @staticmethod1148 def copy(1149 paths,1150 output,1151 base,1152 all_deps=False,1153 use_quiet=False,1154 filename_filter=None,1155 ):1156 # Local packing (don't use any project/session stuff)1157 from .blend import blendfile_copy1158 from bam.utils.system import is_subdir1159 paths = [os.path.abspath(path) for path in paths]1160 base = os.path.abspath(base)1161 output = os.path.abspath(output)1162 # check all blends are in the base path1163 for path in paths:1164 if not is_subdir(path, base):1165 fatal("Input blend file %r is not a sub directory of %r" % (path, base))1166 if use_quiet:1167 report = lambda msg: None1168 else:1169 report = lambda msg: print(msg, end="")1170 # replace var with a pattern matching callback1171 if filename_filter:1172 # convert string into regex callback1173 # "*.txt;*.png;*.rst" --> r".*\.txt$|.*\.png$|.*\.rst$"1174 import re1175 import fnmatch1176 compiled_pattern = re.compile(1177 b'|'.join(fnmatch.translate(f).encode('utf-8')1178 for f in filename_filter.split(";") if f),1179 re.IGNORECASE,1180 )1181 def filename_filter(f):1182 return (not filename_filter.compiled_pattern.match(f))1183 filename_filter.compiled_pattern = compiled_pattern1184 del compiled_pattern1185 del re, fnmatch1186 for msg in blendfile_copy.copy_paths(1187 [path.encode('utf-8') for path in paths],1188 output.encode('utf-8'),1189 base.encode('utf-8'),1190 all_deps=all_deps,1191 report=report,1192 filename_filter=filename_filter,1193 ):1194 pass1195 @staticmethod1196 def remap_start(1197 paths,1198 use_json=False,1199 ):1200 filepath_remap = "bam_remap.data"1201 for p in paths:1202 if not os.path.exists(p):1203 fatal("Path %r not found!" % p)1204 paths = [p.encode('utf-8') for p in paths]1205 if os.path.exists(filepath_remap):1206 fatal("Remap in progress, run with 'finish' or remove %r" % filepath_remap)1207 from bam.blend import blendfile_path_remap1208 remap_data = blendfile_path_remap.start(1209 paths,1210 use_json=use_json,1211 )1212 with open(filepath_remap, 'wb') as fh:1213 import pickle1214 pickle.dump(remap_data, fh, pickle.HIGHEST_PROTOCOL)1215 del pickle1216 @staticmethod1217 def remap_finish(1218 paths,1219 force_relative=False,1220 dry_run=False,1221 use_json=False,1222 ):1223 filepath_remap = "bam_remap.data"1224 for p in paths:1225 if not os.path.exists(p):1226 fatal("Path %r not found!" % p)1227 # bytes needed for blendfile_path_remap API1228 paths = [p.encode('utf-8') for p in paths]1229 if not os.path.exists(filepath_remap):1230 fatal("Remap not started, run with 'start', (%r not found)" % filepath_remap)1231 with open(filepath_remap, 'rb') as fh:1232 import pickle1233 remap_data = pickle.load(fh)1234 del pickle1235 from bam.blend import blendfile_path_remap1236 blendfile_path_remap.finish(1237 paths, remap_data,1238 force_relative=force_relative,1239 dry_run=dry_run,1240 use_json=use_json,1241 )1242 if not dry_run:1243 os.remove(filepath_remap)1244 @staticmethod1245 def remap_reset(1246 use_json=False,1247 ):1248 filepath_remap = "bam_remap.data"1249 if os.path.exists(filepath_remap):1250 os.remove(filepath_remap)1251 else:1252 fatal("remapping not started, nothing to do!")1253# -----------------------------------------------------------------------------1254# Argument Parser1255def init_argparse_common(1256 subparse,1257 use_json=False,1258 use_all_deps=False,1259 use_quiet=False,1260 use_compress_level=False,1261 use_exclude=False,1262 ):1263 import argparse1264 if use_json:1265 subparse.add_argument(1266 "-j", "--json", dest="json", action='store_true',1267 help="Generate JSON output",1268 )1269 if use_all_deps:1270 subparse.add_argument(1271 "-a", "--all-deps", dest="all_deps", action='store_true',1272 help="Follow all dependencies (unused indirect dependencies too)",1273 )1274 if use_quiet:1275 subparse.add_argument(1276 "-q", "--quiet", dest="use_quiet", action='store_true',1277 help="Suppress status output",1278 )1279 if use_compress_level:1280 class ChoiceToZlibLevel(argparse.Action):1281 def __call__(self, parser, namespace, value, option_string=None):1282 setattr(namespace, self.dest, {"default": -1, "fast": 1, "best": 9, "store": 0}[value[0]])1283 subparse.add_argument(1284 "-c", "--compress", dest="compress_level", nargs=1, default=-1, metavar='LEVEL',1285 action=ChoiceToZlibLevel,1286 choices=('default', 'fast', 'best', 'store'),1287 help="Compression level for resulting archive",1288 )1289 if use_exclude:1290 subparse.add_argument(1291 "-e", "--exclude", dest="exclude", metavar='PATTERN(S)', required=False,1292 default="",1293 help="""1294 Optionally exclude files from the pack.1295 Using Unix shell-style wildcards *(case insensitive)*.1296 ``--exclude="*.png"``1297 Multiple patterns can be passed using the ``;`` separator.1298 ``--exclude="*.txt;*.avi;*.wav"``1299 """1300 )1301def create_argparse_init(subparsers):1302 subparse = subparsers.add_parser("init",1303 help="Initialize a new project directory")1304 subparse.add_argument(1305 dest="url",1306 help="Project repository url",1307 )1308 subparse.add_argument(1309 dest="directory_name", nargs="?",1310 help="Directory name",1311 )1312 subparse.set_defaults(1313 func=lambda args:1314 bam_commands.init(args.url, args.directory_name),1315 )1316def create_argparse_create(subparsers):1317 subparse = subparsers.add_parser(1318 "create", aliases=("cr",),1319 help="Create a new empty session directory",1320 )1321 subparse.add_argument(1322 dest="session_name", nargs=1,1323 help="Name of session directory",1324 )1325 subparse.set_defaults(1326 func=lambda args:1327 bam_commands.create(args.session_name[0]),1328 )1329def create_argparse_checkout(subparsers):1330 subparse = subparsers.add_parser(1331 "checkout", aliases=("co",),1332 help="Checkout a remote path in an existing project",1333 )1334 subparse.add_argument(1335 dest="path", type=str, metavar='REMOTE_PATH',1336 help="Path to checkout on the server",1337 )1338 subparse.add_argument(1339 "-o", "--output", dest="output", type=str, metavar='DIRNAME',1340 help="Local name to checkout the session into (optional, falls back to path name)",1341 )1342 init_argparse_common(subparse, use_all_deps=True)1343 subparse.set_defaults(1344 func=lambda args:1345 bam_commands.checkout(args.path, args.output, args.all_deps),1346 )1347def create_argparse_update(subparsers):1348 subparse = subparsers.add_parser(1349 "update", aliases=("up",),1350 help="Update a local session with changes from the remote project",1351 )1352 subparse.add_argument(1353 dest="paths", nargs="*",1354 help="Path(s) to operate on",1355 )1356 subparse.set_defaults(1357 func=lambda args:1358 bam_commands.update(args.paths or ["."]),1359 )1360def create_argparse_revert(subparsers):1361 subparse = subparsers.add_parser(1362 "revert", aliases=("rv",),1363 help="Reset local changes back to the state at time of checkout",1364 )1365 subparse.add_argument(1366 dest="paths", nargs="+",1367 help="Path(s) to operate on",1368 )1369 subparse.set_defaults(1370 func=lambda args:1371 bam_commands.revert(args.paths or ["."]),1372 )1373def create_argparse_commit(subparsers):1374 subparse = subparsers.add_parser(1375 "commit", aliases=("ci",),1376 help="Commit changes from a session to the remote project",1377 )1378 subparse.add_argument(1379 "-m", "--message", dest="message", metavar='MESSAGE',1380 required=True,1381 help="Commit message",1382 )1383 subparse.add_argument(1384 dest="paths", nargs="*",1385 help="paths to commit",1386 )1387 subparse.set_defaults(1388 func=lambda args:1389 bam_commands.commit(args.paths or ["."], args.message),1390 )1391def create_argparse_status(subparsers):1392 subparse = subparsers.add_parser(1393 "status", aliases=("st",),1394 help="Show any edits made in the local session",1395 )1396 subparse.add_argument(1397 dest="paths", nargs="*",1398 help="Path(s) to operate on",1399 )1400 init_argparse_common(subparse, use_json=True)1401 subparse.set_defaults(1402 func=lambda args:1403 bam_commands.status(args.paths or ["."], use_json=args.json),1404 )1405def create_argparse_list(subparsers):1406 subparse = subparsers.add_parser(1407 "list", aliases=("ls",),1408 help="List the contents of a remote directory",1409 )1410 subparse.add_argument(1411 dest="paths", nargs="*",1412 help="Path(s) to operate on",1413 )1414 subparse.add_argument(1415 "-f", "--full", dest="full", action='store_true',1416 help="Show the full paths",1417 )1418 init_argparse_common(subparse, use_json=True)1419 subparse.set_defaults(1420 func=lambda args:1421 bam_commands.list_dir(1422 args.paths or ["."],1423 use_full=args.full,1424 use_json=args.json,1425 ),1426 )1427def create_argparse_deps(subparsers):1428 subparse = subparsers.add_parser(1429 "deps", aliases=("dp",),1430 help="List dependencies for file(s)",1431 )1432 subparse.add_argument(1433 dest="paths", nargs="+",1434 help="Path(s) to operate on",1435 )1436 subparse.add_argument(1437 "-r", "--recursive", dest="recursive", action='store_true',1438 help="Scan dependencies recursively",1439 )1440 init_argparse_common(subparse, use_json=True)1441 subparse.set_defaults(1442 func=lambda args:1443 bam_commands.deps(1444 args.paths, args.recursive,1445 use_json=args.json),1446 )1447def create_argparse_pack(subparsers):1448 import argparse1449 subparse = subparsers.add_parser(1450 "pack", aliases=("pk",),1451 help="Pack a blend file and its dependencies into an archive",1452 description=1453 """1454 You can simply pack a blend file like this to create a zip-file of the same name.1455 .. code-block:: sh1456 bam pack /path/to/scene.blend1457 You may also want to give an explicit output directory.1458 This command is used for packing a ``.blend`` file into a ``.zip`` file for redistribution.1459 .. code-block:: sh1460 # pack a blend with maximum compression for online downloads1461 bam pack /path/to/scene.blend --output my_scene.zip --compress=best1462 You may also pack a .blend while keeping your whole repository hierarchy by passing1463 the path to the top directory of the repository, and ask to be warned about dependencies paths1464 outside of that base path:1465 .. code-block:: sh1466 bam pack --repo="/path/to/repo" --warn-external /path/to/repo/path/to/scene.blend1467 """,1468 formatter_class=argparse.RawDescriptionHelpFormatter,1469 )1470 subparse.add_argument(1471 dest="paths", nargs="+",1472 help="Path(s) to operate on",1473 )1474 subparse.add_argument(1475 "-o", "--output", dest="output", metavar='FILE', required=False,1476 help="Output file or a directory when multiple inputs are passed",1477 )1478 subparse.add_argument(1479 "-m", "--mode", dest="mode", metavar='MODE', required=False,1480 default='ZIP',1481 choices=('ZIP', 'FILE'),1482 help="Output file or a directory when multiple inputs are passed",1483 )1484 subparse.add_argument(1485 "--repo", dest="repository_base_path", metavar='DIR', required=False,1486 help="Base directory from which you want to keep existing hierarchy (usually to repository directory),"1487 "will default to packed blend file's directory if not specified",1488 )1489 subparse.add_argument(1490 "--warn-external", dest="warn_remap_externals", action='store_true',1491 help="Warn for every dependency outside of given repository base path",1492 )1493 init_argparse_common(subparse, use_all_deps=True, use_quiet=True, use_compress_level=True, use_exclude=True)1494 subparse.set_defaults(1495 func=lambda args:1496 bam_commands.pack(1497 args.paths,1498 args.output or1499 ((os.path.splitext(args.paths[0])[0] + ".zip")1500 if args.mode == 'ZIP' else None),1501 args.mode,1502 repository_base_path=args.repository_base_path or None,1503 all_deps=args.all_deps,1504 use_quiet=args.use_quiet,1505 warn_remap_externals=args.warn_remap_externals,1506 compress_level=args.compress_level,1507 filename_filter=args.exclude,1508 ),1509 )1510def create_argparse_copy(subparsers):1511 import argparse1512 subparse = subparsers.add_parser(1513 "copy", aliases=("cp",),1514 help="Copy blend file(s) and their dependencies to a new location (maintaining the directory structure).",1515 description=1516 """1517 The line below will copy ``scene.blend`` to ``/destination/to/scene.blend``.1518 .. code-block:: sh1519 bam copy /path/to/scene.blend --base=/path --output=/destination1520 .. code-block:: sh1521 # you can also copy multiple files1522 bam copy /path/to/scene.blend /path/other/file.blend --base=/path --output /other/destination1523 """,1524 formatter_class=argparse.RawDescriptionHelpFormatter,1525 )1526 subparse.add_argument(1527 dest="paths", nargs="+",1528 help="Path(s) to blend files to operate on",1529 )1530 subparse.add_argument(1531 "-o", "--output", dest="output", metavar='DIR', required=True,1532 help="Output directory where where files will be copied to",1533 )1534 subparse.add_argument(1535 "-b", "--base", dest="base", metavar='DIR', required=True,1536 help="Base directory for input paths (files outside this path will be omitted)",1537 )1538 init_argparse_common(subparse, use_all_deps=True, use_quiet=True, use_exclude=True)1539 subparse.set_defaults(1540 func=lambda args:1541 bam_commands.copy(1542 args.paths,1543 args.output,1544 args.base,1545 all_deps=args.all_deps,1546 use_quiet=args.use_quiet,1547 filename_filter=args.exclude,1548 ),1549 )1550def create_argparse_remap(subparsers):1551 import argparse1552 subparse = subparsers.add_parser(1553 "remap",1554 help="Remap blend file paths",1555 description=1556 """1557 This command is a 3 step process:1558 - first run ``bam remap start .`` which stores the current state of your project (recursively).1559 - then re-arrange the files on the filesystem (rename, relocate).1560 - finally run ``bam remap finish`` to apply the changes, updating the ``.blend`` files internal paths.1561 .. code-block:: sh1562 cd /my/project1563 bam remap start .1564 mv photos textures1565 mv house_v14_library.blend house_libraray.blend1566 bam remap finish1567 .. note::1568 Remapping creates a file called ``bam_remap.data`` in the current directory.1569 You can relocate the entire project to a new location but on executing ``finish``,1570 this file must be accessible from the current directory.1571 .. note::1572 This command depends on files unique contents,1573 take care not to modify the files once remap is started.1574 """,1575 formatter_class=argparse.RawDescriptionHelpFormatter,1576 )1577 subparse_remap_commands = subparse.add_subparsers(1578 title="Remap commands",1579 description='valid subcommands',1580 help='additional help',1581 )1582 sub_subparse = subparse_remap_commands.add_parser(1583 "start",1584 help="Start remapping the blend files",1585 )1586 sub_subparse.add_argument(1587 dest="paths", nargs="*",1588 help="Path(s) to operate on",1589 )1590 init_argparse_common(sub_subparse, use_json=True)1591 sub_subparse.set_defaults(1592 func=lambda args:1593 bam_commands.remap_start(1594 args.paths or ["."],1595 use_json=args.json,1596 ),1597 )1598 sub_subparse = subparse_remap_commands.add_parser(1599 "finish",1600 help="Finish remapping the blend files",1601 )1602 sub_subparse.add_argument(1603 dest="paths", nargs="*",1604 help="Path(s) to operate on",1605 )1606 sub_subparse.add_argument(1607 "-r", "--force-relative", dest="force_relative", action='store_true',1608 help="Make all remapped paths relative (even if they were originally absolute)",1609 )1610 sub_subparse.add_argument(1611 "-d", "--dry-run", dest="dry_run", action='store_true',1612 help="Just print output as if the paths are being run",1613 )1614 init_argparse_common(sub_subparse, use_json=True)1615 sub_subparse.set_defaults(1616 func=lambda args:1617 bam_commands.remap_finish(1618 args.paths or ["."],1619 force_relative=args.force_relative,1620 dry_run=args.dry_run,1621 use_json=args.json,1622 ),1623 )1624 sub_subparse = subparse_remap_commands.add_parser(1625 "reset",1626 help="Cancel path remapping",1627 )1628 init_argparse_common(sub_subparse, use_json=True)1629 sub_subparse.set_defaults(1630 func=lambda args:1631 bam_commands.remap_reset(1632 use_json=args.json,1633 ),1634 )1635def create_argparse():1636 import argparse1637 usage_text = (1638 "BAM!\n" +1639 __doc__1640 )1641 parser = argparse.ArgumentParser(1642 prog="bam",1643 description=usage_text,1644 )1645 subparsers = parser.add_subparsers(1646 title='subcommands',1647 description='valid subcommands',1648 help='additional help',1649 )1650 create_argparse_init(subparsers)1651 create_argparse_create(subparsers)1652 create_argparse_checkout(subparsers)1653 create_argparse_commit(subparsers)1654 create_argparse_update(subparsers)1655 create_argparse_revert(subparsers)1656 create_argparse_status(subparsers)1657 create_argparse_list(subparsers)1658 # non-bam project commands1659 create_argparse_deps(subparsers)1660 create_argparse_pack(subparsers)1661 create_argparse_copy(subparsers)1662 create_argparse_remap(subparsers)1663 return parser1664def main(argv=None):1665 if argv is None:1666 argv = sys.argv[1:]1667 logging.basicConfig(1668 level=logging.INFO,1669 format='%(asctime)-15s %(levelname)8s %(name)s %(message)s',1670 )1671 parser = create_argparse()1672 args = parser.parse_args(argv)1673 # call subparser callback1674 if not hasattr(args, "func"):1675 parser.print_help()1676 return...
upgradeworkflow.py
Source:upgradeworkflow.py
...256 if remap is None:257 # don't add the annotation back in258 continue259 elif type(remap) != type(""):260 ops.extend(remap(annotation))261 continue262 else:263 annotation_key = remap264 new_annotation = \265 Annotation(id=controller.id_scope.getNewId(Annotation.vtType),266 key=annotation_key,267 value=annotation.value)268 new_module.add_annotation(new_annotation)269 if not old_module.is_group() and not old_module.is_abstraction():270 for port_spec in old_module.port_spec_list:271 if port_spec.type == 'input':272 if port_spec.name not in dst_port_remap:273 spec_name = port_spec.name274 else:275 remap = dst_port_remap[port_spec.name]276 if remap is None:277 continue278 elif type(remap) != type(""):279 ops.extend(remap(port_spec))280 continue281 else:282 spec_name = remap283 elif port_spec.type == 'output':284 if port_spec.name not in src_port_remap:285 spec_name = port_spec.name286 else:287 remap = src_port_remap[port_spec.name]288 if remap is None:289 continue290 elif type(remap) != type(""):291 ops.extend(remap(port_spec))292 continue293 else:294 spec_name = remap 295 new_spec = port_spec.do_copy(True, controller.id_scope, {})296 new_spec.name = spec_name297 new_module.add_port_spec(new_spec)298 for function in old_module.functions:299 if function.name not in function_remap:300 function_name = function.name301 else:302 remap = function_remap[function.name]303 if remap is None:304 # don't add the function back in305 continue 306 elif type(remap) != type(""):307 ops.extend(remap(function, new_module))308 continue309 else:310 function_name = remap311 if len(function.parameters) > 0:312 new_param_vals, aliases = zip(*[(p.strValue, p.alias) 313 for p in function.parameters])314 else:315 new_param_vals = []316 aliases = []317 new_function = controller.create_function(new_module, 318 function_name,319 new_param_vals,320 aliases)321 new_module.add_function(new_function)322 # add the new module323 ops.append(('add', new_module))324 create_new_connection = UpgradeWorkflowHandler.create_new_connection325 for _, conn_id in pipeline.graph.edges_from(old_module.id):326 old_conn = pipeline.connections[conn_id]327 if old_conn.source.name not in src_port_remap:328 source_name = old_conn.source.name329 else:330 remap = src_port_remap[old_conn.source.name]331 if remap is None:332 # don't add this connection back in333 continue334 elif type(remap) != type(""):335 ops.extend(remap(old_conn, new_module))336 continue337 else:338 source_name = remap339 340 old_dst_module = pipeline.modules[old_conn.destination.moduleId]341 new_conn = create_new_connection(controller,342 new_module,343 source_name,344 old_dst_module,345 old_conn.destination)346 ops.append(('add', new_conn))347 348 for _, conn_id in pipeline.graph.edges_to(old_module.id):349 old_conn = pipeline.connections[conn_id]350 if old_conn.destination.name not in dst_port_remap:351 destination_name = old_conn.destination.name352 else:353 remap = dst_port_remap[old_conn.destination.name]354 if remap is None:355 # don't add this connection back in356 continue357 elif type(remap) != type(""):358 ops.extend(remap(old_conn, new_module))359 continue360 else:361 destination_name = remap362 363 old_src_module = pipeline.modules[old_conn.source.moduleId]364 new_conn = create_new_connection(controller,365 old_src_module,366 old_conn.source,367 new_module,368 destination_name)369 ops.append(('add', new_conn))370 371 return [core.db.action.create_action(ops)]372 @staticmethod373 def replace_group(controller, pipeline, module_id, new_subpipeline):374 old_group = pipeline.modules[module_id]375 new_group = controller.create_module('edu.utah.sci.vistrails.basic', 376 'Group', '', 377 old_group.location.x, 378 old_group.location.y)379 new_group.pipeline = new_subpipeline380 return UpgradeWorkflowHandler.replace_generic(controller, pipeline, 381 old_group, new_group)382 @staticmethod383 def replace_module(controller, pipeline, module_id, new_descriptor,384 function_remap={}, src_port_remap={}, dst_port_remap={},385 annotation_remap={}):386 old_module = pipeline.modules[module_id]387 internal_version = -1388 # try to determine whether new module is an abstraction389 if (hasattr(new_descriptor, 'module') and390 hasattr(new_descriptor.module, "vistrail") and 391 hasattr(new_descriptor.module, "internal_version")):392 internal_version = new_descriptor.version393 new_module = \394 controller.create_module_from_descriptor(new_descriptor,395 old_module.location.x,396 old_module.location.y,397 internal_version)398 return UpgradeWorkflowHandler.replace_generic(controller, pipeline, 399 old_module, new_module,400 function_remap, 401 src_port_remap, 402 dst_port_remap,403 annotation_remap)404 @staticmethod405 def remap_module(controller, module_id, pipeline, module_remap):406 """remap_module offers a method to shortcut the407 specification of upgrades. It is useful when just changing408 the names of ports or modules, but can also be used to add409 intermediate modules or change the format of parameters. It410 is usually called from handle_module_upgrade_request, and the411 first three arguments are passed from the arguments to that412 method.413 module_remap specifies all of the changes and is of the format414 {<old_module_name>: [(<start_version>, <end_version>, 415 <new_module_klass> | <new_module_id> | None, 416 <remap_dictionary>)]}417 where new_module_klass is the class and new_module_id418 is a string of the format 419 <package_name>:[<namespace> | ]<module_name>420 passing None keeps the original name,421 and remap_dictionary is {<remap_type>:422 <name_changes>} and <name_changes> is a map from <old_name> to423 <new_name> or <remap_function>424 The remap functions are passed the old object and the new425 module and should return a list of operations with elements of426 the form ('add', <obj>).427 For example:428 def outputName_remap(old_conn, new_module):429 ops = []430 ...431 return ops432 module_remap = {'FileSink': [(None, '1.5.1', FileSink,433 {'dst_port_remap':434 {'overrideFile': 'overwrite',435 'outputName': outputName_remap},436 'function_remap':437 {'overrideFile': 'overwrite',438 'outputName': 'outputPath'}}),439 }440 """441 reg = get_module_registry()442 old_module = pipeline.modules[module_id]...
stdglue.py
Source:stdglue.py
1# stdglue - canned prolog and epilog functions for the remappable builtin codes (T,M6,M61,S,F)2#3# we dont use argspec to avoid the generic error message of the argspec prolog and give more4# concise ones here5# cycle_prolog,cycle_epilog: generic code-independent support glue for oword sub cycles6#7# these are provided as starting point - for more concise error message you would better8# write a prolog specific for the code9#10# Usage:11#REMAP=G84.3 modalgroup=1 argspec=xyzqp prolog=cycle_prolog ngc=g843 epilog=cycle_epilog12import emccanon 13from interpreter import *14throw_exceptions = 115# REMAP=S prolog=setspeed_prolog ngc=setspeed epilog=setspeed_epilog16# exposed parameter: #<speed>17def setspeed_prolog(self,**words):18 try:19 c = self.blocks[self.remap_level]20 if not c.s_flag:21 self.set_errormsg("S requires a value") 22 return INTERP_ERROR23 self.params["speed"] = c.s_number24 except Exception,e:25 self.set_errormsg("S/setspeed_prolog: %s)" % (e))26 return INTERP_ERROR27 return INTERP_OK28def setspeed_epilog(self,**words):29 try:30 if not self.value_returned:31 r = self.blocks[self.remap_level].executing_remap32 self.set_errormsg("the %s remap procedure %s did not return a value"33 % (r.name,r.remap_ngc if r.remap_ngc else r.remap_py))34 return INTERP_ERROR35 if self.return_value < -TOLERANCE_EQUAL: # 'less than 0 within interp's precision'36 self.set_errormsg("S: remap procedure returned %f" % (self.return_value)) 37 return INTERP_ERROR38 if self.blocks[self.remap_level].builtin_used:39 pass40 #print "---------- S builtin recursion, nothing to do"41 else:42 self.speed = self.params["speed"]43 emccanon.enqueue_SET_SPINDLE_SPEED(self.speed)44 return INTERP_OK45 except Exception,e:46 self.set_errormsg("S/setspeed_epilog: %s)" % (e))47 return INTERP_ERROR48 return INTERP_OK 49# REMAP=F prolog=setfeed_prolog ngc=setfeed epilog=setfeed_epilog50# exposed parameter: #<feed>51def setfeed_prolog(self,**words):52 try:53 c = self.blocks[self.remap_level]54 if not c.f_flag:55 self.set_errormsg("F requires a value") 56 return INTERP_ERROR57 self.params["feed"] = c.f_number58 except Exception,e:59 self.set_errormsg("F/setfeed_prolog: %s)" % (e))60 return INTERP_ERROR61 return INTERP_OK 62def setfeed_epilog(self,**words):63 try:64 if not self.value_returned:65 r = self.blocks[self.remap_level].executing_remap66 self.set_errormsg("the %s remap procedure %s did not return a value"67 % (r.name,r.remap_ngc if r.remap_ngc else r.remap_py))68 return INTERP_ERROR69 if self.blocks[self.remap_level].builtin_used:70 pass71 #print "---------- F builtin recursion, nothing to do"72 else:73 self.feed_rate = self.params["feed"]74 emccanon.enqueue_SET_FEED_RATE(self.feed_rate)75 return INTERP_OK76 except Exception,e:77 self.set_errormsg("F/setfeed_epilog: %s)" % (e))78 return INTERP_ERROR79 return INTERP_OK 80# REMAP=T prolog=prepare_prolog ngc=prepare epilog=prepare_epilog81# exposed parameters: #<tool> #<pocket>82def prepare_prolog(self,**words):83 try:84 cblock = self.blocks[self.remap_level]85 if not cblock.t_flag:86 self.set_errormsg("T requires a tool number")87 return INTERP_ERROR88 tool = cblock.t_number89 if tool:90 (status, pocket) = self.find_tool_pocket(tool)91 if status != INTERP_OK:92 self.set_errormsg("T%d: pocket not found" % (tool))93 return status94 else:95 pocket = -1 # this is a T0 - tool unload96 self.params["tool"] = tool97 self.params["pocket"] = pocket98 return INTERP_OK99 except Exception, e:100 self.set_errormsg("T%d/prepare_prolog: %s" % (int(words['t']), e))101 return INTERP_ERROR102def prepare_epilog(self, **words):103 try:104 if not self.value_returned:105 r = self.blocks[self.remap_level].executing_remap106 self.set_errormsg("the %s remap procedure %s did not return a value"107 % (r.name,r.remap_ngc if r.remap_ngc else r.remap_py))108 return INTERP_ERROR109 if self.blocks[self.remap_level].builtin_used:110 #print "---------- T builtin recursion, nothing to do"111 return INTERP_OK112 else:113 if self.return_value > 0:114 self.selected_tool = int(self.params["tool"])115 self.selected_pocket = int(self.params["pocket"])116 emccanon.SELECT_POCKET(self.selected_pocket, self.selected_tool)117 return INTERP_OK118 else:119 self.set_errormsg("T%d: aborted (return code %.1f)" % (int(self.params["tool"]),self.return_value))120 return INTERP_ERROR121 except Exception, e:122 self.set_errormsg("T%d/prepare_epilog: %s" % (tool,e))123 return INTERP_ERROR 124# REMAP=M6 modalgroup=6 prolog=change_prolog ngc=change epilog=change_epilog125# exposed parameters:126# #<tool_in_spindle>127# #<selected_tool>128# #<current_pocket>129# #<selected_pocket>130def change_prolog(self, **words):131 try:132 # this is relevant only when using iocontrol-v2.133 if self.params[5600] > 0.0:134 if self.params[5601] < 0.0:135 self.set_errormsg("Toolchanger hard fault %d" % (int(self.params[5601])))136 return INTERP_ERROR137 print "change_prolog: Toolchanger soft fault %d" % int(self.params[5601])138 139 if self.selected_pocket < 0:140 self.set_errormsg("M6: no tool prepared")141 return INTERP_ERROR142 if self.cutter_comp_side:143 self.set_errormsg("Cannot change tools with cutter radius compensation on")144 return INTERP_ERROR145 self.params["tool_in_spindle"] = self.current_tool146 self.params["selected_tool"] = self.selected_tool147 self.params["current_pocket"] = self.current_pocket # this is probably nonsense148 self.params["selected_pocket"] = self.selected_pocket149 return INTERP_OK150 except Exception, e:151 self.set_errormsg("M6/change_prolog: %s" % (e))152 return INTERP_ERROR153 154def change_epilog(self, **words):155 try:156 if not self.value_returned:157 r = self.blocks[self.remap_level].executing_remap158 self.set_errormsg("the %s remap procedure %s did not return a value"159 % (r.name,r.remap_ngc if r.remap_ngc else r.remap_py))160 return INTERP_ERROR161 # this is relevant only when using iocontrol-v2.162 if self.params[5600] > 0.0:163 if self.params[5601] < 0.0:164 self.set_errormsg("Toolchanger hard fault %d" % (int(self.params[5601])))165 return INTERP_ERROR166 print "change_epilog: Toolchanger soft fault %d" % int(self.params[5601])167 if self.blocks[self.remap_level].builtin_used:168 #print "---------- M6 builtin recursion, nothing to do"169 return INTERP_OK170 else:171 if self.return_value > 0.0:172 # commit change173 self.selected_pocket = int(self.params["selected_pocket"])174 emccanon.CHANGE_TOOL(self.selected_pocket)175 self.current_pocket = self.selected_pocket176 self.selected_pocket = -1177 self.selected_tool = -1178 # cause a sync()179 self.set_tool_parameters()180 self.toolchange_flag = True181 return INTERP_EXECUTE_FINISH182 else:183 self.set_errormsg("M6 aborted (return code %.1f)" % (self.return_value))184 return INTERP_ERROR185 except Exception, e:186 self.set_errormsg("M6/change_epilog: %s" % (e))187 return INTERP_ERROR188# REMAP=M61 modalgroup=6 prolog=settool_prolog ngc=settool epilog=settool_epilog189# exposed parameters: #<tool> #<pocket>190def settool_prolog(self,**words):191 try:192 c = self.blocks[self.remap_level]193 if not c.q_flag:194 self.set_errormsg("M61 requires a Q parameter") 195 return INTERP_ERROR196 tool = int(c.q_number)197 if tool < -TOLERANCE_EQUAL: # 'less than 0 within interp's precision'198 self.set_errormsg("M61: Q value < 0") 199 return INTERP_ERROR200 (status,pocket) = self.find_tool_pocket(tool)201 if status != INTERP_OK:202 self.set_errormsg("M61 failed: requested tool %d not in table" % (tool))203 return status204 self.params["tool"] = tool205 self.params["pocket"] = pocket206 return INTERP_OK207 except Exception,e:208 self.set_errormsg("M61/settool_prolog: %s)" % (e))209 return INTERP_ERROR210def settool_epilog(self,**words):211 try:212 if not self.value_returned:213 r = self.blocks[self.remap_level].executing_remap214 self.set_errormsg("the %s remap procedure %s did not return a value"215 % (r.name,r.remap_ngc if r.remap_ngc else r.remap_py))216 return INTERP_ERROR217 if self.blocks[self.remap_level].builtin_used:218 #print "---------- M61 builtin recursion, nothing to do"219 return INTERP_OK220 else:221 if self.return_value > 0.0:222 self.current_tool = int(self.params["tool"])223 self.current_pocket = int(self.params["pocket"])224 emccanon.CHANGE_TOOL_NUMBER(self.current_pocket)225 # cause a sync()226 self.tool_change_flag = True227 self.set_tool_parameters()228 else:229 self.set_errormsg("M61 aborted (return code %.1f)" % (self.return_value))230 return INTERP_ERROR231 except Exception,e:232 self.set_errormsg("M61/settool_epilog: %s)" % (e))233 return INTERP_ERROR234# educational alternative: M61 remapped to an all-Python handler235# demo - this really does the same thing as the builtin (non-remapped) M61236#237# REMAP=M61 modalgroup=6 python=set_tool_number238def set_tool_number(self, **words):239 try:240 c = self.blocks[self.remap_level]241 if c.q_flag:242 toolno = int(c.q_number)243 else:244 self.set_errormsg("M61 requires a Q parameter")245 return status 246 (status,pocket) = self.find_tool_pocket(toolno)247 if status != INTERP_OK:248 self.set_errormsg("M61 failed: requested tool %d not in table" % (toolno))249 return status250 if words['q'] > -TOLERANCE_EQUAL: # 'greater equal 0 within interp's precision'251 self.current_pocket = pocket252 self.current_tool = toolno253 emccanon.CHANGE_TOOL_NUMBER(pocket)254 # cause a sync()255 self.tool_change_flag = True256 self.set_tool_parameters()257 return INTERP_OK258 else:259 self.set_errormsg("M61 failed: Q=%4" % (toolno))260 return INTERP_ERROR261 except Exception, e:262 self.set_errormsg("M61/set_tool_number: %s" % (e))263 return INTERP_ERROR264_uvw = ("u","v","w","a","b","c")265_xyz = ("x","y","z","a","b","c")266# given a plane, return sticky words, incompatible axis words and plane name267# sticky[0] is also the movement axis268_compat = {269 emccanon.CANON_PLANE_XY : (("z","r"),_uvw,"XY"),270 emccanon.CANON_PLANE_YZ : (("x","r"),_uvw,"YZ"),271 emccanon.CANON_PLANE_XZ : (("y","r"),_uvw,"XZ"),272 emccanon.CANON_PLANE_UV : (("w","r"),_xyz,"UV"),273 emccanon.CANON_PLANE_VW : (("u","r"),_xyz,"VW"),274 emccanon.CANON_PLANE_UW : (("v","r"),_xyz,"UW")} 275# extract and pass parameters from current block, merged with extra paramters on a continuation line276# keep tjose parameters across invocations277# export the parameters into the oword procedure278def cycle_prolog(self,**words):279 # self.sticky_params is assumed to have been initialized by the280 # init_stgdlue() method below281 global _compat282 try: 283 # determine whether this is the first or a subsequent call284 c = self.blocks[self.remap_level]285 r = c.executing_remap286 if c.g_modes[1] == r.motion_code:287 # first call - clear the sticky dict288 self.sticky_params[r.name] = dict()289 self.params["motion_code"] = c.g_modes[1]290 291 (sw,incompat,plane_name) =_compat[self.plane]292 for (word,value) in words.items():293 # inject current parameters294 self.params[word] = value295 # record sticky words296 if word in sw:297 if self.debugmask & 0x00080000: print "%s: record sticky %s = %.4f" % (r.name,word,value)298 self.sticky_params[r.name][word] = value299 if word in incompat:300 return "%s: Cannot put a %s in a canned cycle in the %s plane" % (r.name, word.upper(), plane_name)301 # inject sticky parameters which were not in words:302 for (key,value) in self.sticky_params[r.name].items():303 if not key in words:304 if self.debugmask & 0x00080000: print "%s: inject sticky %s = %.4f" % (r.name,key,value)305 self.params[key] = value306 if not "r" in self.sticky_params[r.name]:307 return "%s: cycle requires R word" % (r.name)308 else:309 if self.sticky_params[r.name] <= 0.0:310 return "%s: R word must be > 0 if used (%.4f)" % (r.name, words["r"])311 if "l" in words:312 # checked in interpreter during block parsing313 # if l <= 0 or l not near an int314 self.params["l"] = words["l"]315 316 if "p" in words:317 p = words["p"]318 if p < 0.0:319 return "%s: P word must be >= 0 if used (%.4f)" % (r.name, p)320 self.params["p"] = p321 if self.feed_rate == 0.0:322 return "%s: feed rate must be > 0" % (r.name)323 if self.feed_mode == INVERSE_TIME:324 return "%s: Cannot use inverse time feed with canned cycles" % (r.name)325 if self.cutter_comp_side:326 return "%s: Cannot use canned cycles with cutter compensation on" % (r.name)327 return INTERP_OK328 329 except Exception, e:330 raise331 return "cycle_prolog failed: %s" % (e)332# make sure the next line has the same motion code, unless overriden by a333# new G code334def cycle_epilog(self,**words):335 try:336 c = self.blocks[self.remap_level]337 self.motion_mode = c.executing_remap.motion_code # retain the current motion mode338 return INTERP_OK339 except Exception, e:340 return "cycle_epilog failed: %s" % (e)341# this should be called from TOPLEVEL __init__()342def init_stdglue(self):...
blendfile_path_remap.py
Source:blendfile_path_remap.py
1#!/usr/bin/env python32# ***** BEGIN GPL LICENSE BLOCK *****3#4# This program is free software; you can redistribute it and/or5# modify it under the terms of the GNU General Public License6# as published by the Free Software Foundation; either version 27# of the License, or (at your option) any later version.8#9# This program is distributed in the hope that it will be useful,10# but WITHOUT ANY WARRANTY; without even the implied warranty of11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the12# GNU General Public License for more details.13#14# You should have received a copy of the GNU General Public License15# along with this program; if not, write to the Free Software Foundation,16# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.17#18# ***** END GPL LICENCE BLOCK *****19"""20Module for remapping paths from one directory to another.21"""22import os23# ----------------------------------------------------------------------------24# private utility functions25def _is_blend(f):26 return f.lower().endswith(b'.blend')27def _warn__ascii(msg):28 print(" warning: %s" % msg)29def _info__ascii(msg):30 print(msg)31def _warn__json(msg):32 import json33 print(json.dumps(("warning", msg)), end=",\n")34def _info__json(msg):35 import json36 print(json.dumps(("info", msg)), end=",\n")37def _uuid_from_file(fn, block_size=1 << 20):38 with open(fn, 'rb') as f:39 # first get the size40 f.seek(0, os.SEEK_END)41 size = f.tell()42 f.seek(0, os.SEEK_SET)43 # done!44 import hashlib45 sha1 = hashlib.new('sha512')46 while True:47 data = f.read(block_size)48 if not data:49 break50 sha1.update(data)51 return (hex(size)[2:] + sha1.hexdigest()).encode()52def _iter_files(paths, check_ext=None):53 # note, sorting isn't needed54 # just gives predictable output55 for p in paths:56 p = os.path.abspath(p)57 for dirpath, dirnames, filenames in sorted(os.walk(p)):58 # skip '.svn'59 if dirpath.startswith(b'.') and dirpath != b'.':60 continue61 for filename in sorted(filenames):62 if check_ext is None or check_ext(filename):63 filepath = os.path.join(dirpath, filename)64 yield filepath65# ----------------------------------------------------------------------------66# Public Functions67def start(68 paths,69 is_quiet=False,70 dry_run=False,71 use_json=False,72 ):73 if use_json:74 warn = _warn__json75 info = _info__json76 else:77 warn = _warn__ascii78 info = _info__ascii79 if use_json:80 print("[")81 # {(sha1, length): "filepath"}82 remap_uuid = {}83 # relative paths which don't exist,84 # don't complain when they're missing on remap.85 # {f_src: [relative path deps, ...]}86 remap_lost = {}87 # all files we need to map88 # absolute paths89 files_to_map = set()90 # TODO, validate paths aren't nested! ["/foo", "/foo/bar"]91 # it will cause problems touching files twice!92 # ------------------------------------------------------------------------93 # First walk over all blends94 from bam.blend import blendfile_path_walker95 for blendfile_src in _iter_files(paths, check_ext=_is_blend):96 if not is_quiet:97 info("blend read: %r" % blendfile_src)98 remap_lost[blendfile_src] = remap_lost_blendfile_src = set()99 for fp, (rootdir, fp_blend_basename) in blendfile_path_walker.FilePath.visit_from_blend(100 blendfile_src,101 readonly=True,102 recursive=False,103 ):104 # TODO. warn when referencing files outside 'paths'105 # so we can update the reference106 f_abs = fp.filepath_absolute107 f_abs = os.path.normpath(f_abs)108 if os.path.exists(f_abs):109 files_to_map.add(f_abs)110 else:111 if not is_quiet:112 warn("file %r not found!" % f_abs)113 # don't complain about this file being missing on remap114 remap_lost_blendfile_src.add(fp.filepath)115 # so we can know where its moved to116 files_to_map.add(blendfile_src)117 del blendfile_path_walker118 # ------------------------------------------------------------------------119 # Store UUID120 #121 # note, sorting is only to give predictable warnings/behavior122 for f in sorted(files_to_map):123 f_uuid = _uuid_from_file(f)124 f_match = remap_uuid.get(f_uuid)125 if f_match is not None:126 if not is_quiet:127 warn("duplicate file found! (%r, %r)" % (f_match, f))128 remap_uuid[f_uuid] = f129 # now find all deps130 remap_data_args = (131 remap_uuid,132 remap_lost,133 )134 if use_json:135 if not remap_uuid:136 print("\"nothing to remap!\"")137 else:138 print("\"complete\"")139 print("]")140 else:141 if not remap_uuid:142 print("Nothing to remap!")143 return remap_data_args144def finish(145 paths, remap_data_args,146 is_quiet=False,147 force_relative=False,148 dry_run=False,149 use_json=False,150 ):151 if use_json:152 warn = _warn__json153 info = _info__json154 else:155 warn = _warn__ascii156 info = _info__ascii157 if use_json:158 print("[")159 (remap_uuid,160 remap_lost,161 ) = remap_data_args162 remap_src_to_dst = {}163 remap_dst_to_src = {}164 for f_dst in _iter_files(paths):165 f_uuid = _uuid_from_file(f_dst)166 f_src = remap_uuid.get(f_uuid)167 if f_src is not None:168 remap_src_to_dst[f_src] = f_dst169 remap_dst_to_src[f_dst] = f_src170 # now the fun begins, remap _all_ paths171 from bam.blend import blendfile_path_walker172 for blendfile_dst in _iter_files(paths, check_ext=_is_blend):173 blendfile_src = remap_dst_to_src.get(blendfile_dst)174 if blendfile_src is None:175 if not is_quiet:176 warn("new blendfile added since beginning 'remap': %r" % blendfile_dst)177 continue178 # not essential, just so we can give more meaningful errors179 remap_lost_blendfile_src = remap_lost[blendfile_src]180 if not is_quiet:181 info("blend write: %r -> %r" % (blendfile_src, blendfile_dst))182 blendfile_src_basedir = os.path.dirname(blendfile_src)183 blendfile_dst_basedir = os.path.dirname(blendfile_dst)184 for fp, (rootdir, fp_blend_basename) in blendfile_path_walker.FilePath.visit_from_blend(185 blendfile_dst,186 readonly=False,187 recursive=False,188 ):189 # TODO. warn when referencing files outside 'paths'190 # so we can update the reference191 f_src_orig = fp.filepath192 if f_src_orig in remap_lost_blendfile_src:193 # this file never existed, so we can't remap it194 continue195 is_relative = f_src_orig.startswith(b'//')196 if is_relative:197 f_src_abs = fp.filepath_absolute_resolve(basedir=blendfile_src_basedir)198 else:199 f_src_abs = f_src_orig200 f_src_abs = os.path.normpath(f_src_abs)201 f_dst_abs = remap_src_to_dst.get(f_src_abs)202 if f_dst_abs is None:203 if not is_quiet:204 warn("file %r not found in map!" % f_src_abs)205 continue206 # now remap!207 if is_relative or force_relative:208 f_dst_final = b'//' + os.path.relpath(f_dst_abs, blendfile_dst_basedir)209 else:210 f_dst_final = f_dst_abs211 if f_dst_final != f_src_orig:212 if not dry_run:213 fp.filepath = f_dst_final214 if not is_quiet:215 info("remap %r -> %r" % (f_src_abs, f_dst_abs))216 del blendfile_path_walker217 if use_json:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!