Best Python code snippet using slash
pdgcmd.py
Source:pdgcmd.py
1#2# Copyright (c) <2020> Side Effects Software Inc.3#4# Permission is hereby granted, free of charge, to any person obtaining a copy5# of this software and associated documentation files (the "Software"), to deal6# in the Software without restriction, including without limitation the rights7# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell8# copies of the Software, and to permit persons to whom the Software is9# furnished to do so, subject to the following conditions:10#11# The above copyright notice and this permission notice shall be included in all12# copies or substantial portions of the Software.13#14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR15# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,16# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE17# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER18# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE20# SOFTWARE.21#22# NAME: pdgcmd.py ( Python )23#24# COMMENTS: Utility methods for jobs that need to report back to PDG.25# Not dependent on Houdini install.26#27from __future__ import print_function, absolute_import, unicode_literals28from datetime import datetime29import importlib30import json31import locale32import os33import platform34from random import uniform35import shlex36import socket37import subprocess38import sys39import traceback40import time41# Maximum number of retries for callbacks that get ECONNREFUSED42max_retries = int(os.environ.get('PDG_RPC_RETRIES', 4))43# seconds of timeout on making socket connections for RPC44rpc_timeout = int(os.environ.get('PDG_RPC_TIMEOUT', 4))45# maximum seconds to wait between retries after a timeout46rpc_backoff = int(os.environ.get('PDG_RPC_MAX_BACKOFF', 4))47# Rpc back end can be overriden with environment48rpc_delegate_from_env = os.environ.get('PDG_RPC_DELEGATE', None)49# Seconds delay between batch item checks for readiness to cook50batch_poll_delay = float(os.environ.get('PDG_BATCH_POLL_DELAY', 1.0))51# Should we call release_job_slots and acquire_job_slots when batch polling?52# If the scheduler does not support this, we should avoid the RPC overhead53release_job_slots_on_poll = int(os.environ.get('PDG_RELEASE_SLOT_ON_POLL', 1))54# how much logging55verbose = int(os.environ.get('PDG_VERBOSE', 0))56LogDefault, LogVerbose = 0, 157# Utility for redirecting logs to a string buffer58class RedirectBuffer(object):59 def __init__(self, source, tee=True):60 self._buffer = ""61 self._source = source62 self._tee = tee63 def __enter__(self):64 sys.stdout = self65 return self66 def __exit__(self, type, value, traceback):67 sys.stdout = self._source68 def write(self, text):69 self._buffer += str(text)70 if self._tee:71 self._source.write(text)72 def flush(self):73 if self._tee:74 self._source.flush()75 def buffer(self):76 return self._buffer77# timestamps in output can be disabled78disable_timestamps = int(os.environ.get('PDG_DISABLE_TIMESTAMPS', 0))79# Disable RPCs if set with env, or if there is no PDG_RESULT_SERVER80if 'PDG_RESULT_SERVER' not in os.environ:81 disable_rpc = True82else:83 disable_rpc = int(os.environ.get('PDG_DISABLE_RPC', 0)) > 084result_server_addr = None85rpc_delegate = None86try:87 theJobid = os.environ[os.environ['PDG_JOBID_VAR']]88except:89 theJobid = ''90if not disable_rpc:91 try:92 import xmlrpclib # python 2.x93 except ImportError:94 import xmlrpc.client as xmlrpclib95# list of (src_path, dest_path) for the local pathmap, created on demand96thePathMaps = None97# Passing $PDG_DELOCALIZE=0 turns off delocalization of paths - this is the98# substitution of __PDG_DIR__.99theDelocalizePaths = os.environ.get('PDG_DELOCALIZE', '1') != '0'100def decode_str(s):101 if sys.version_info.major >= 3:102 return s103 return s.decode(sys.getfilesystemencoding())104def is_str(s):105 if sys.version_info.major >= 3:106 str_type = str107 else:108 str_type = unicode109 return isinstance(s, (bytes, bytearray, str_type))110def setVerbosity(level):111 """112 Sets the global verbosity level113 """114 global verbose115 verbose = level116def printlog(msg, msg_verbosity=LogDefault, timestamp=True, prefix=''):117 """118 Print a log message, msg_verbosity > 0 means only print when PDG_VERBOSE119 is set to a higher value120 """121 if msg_verbosity > verbose:122 return123 # choose the encoding expected by stdout124 try:125 stdoutenc = sys.stdout.encoding126 except AttributeError:127 stdoutenc = None128 if not stdoutenc:129 try:130 stdoutenc = locale.getpreferredencoding()131 if not stdoutenc:132 stdoutenc = 'ascii'133 except:134 stdoutenc = 'ascii'135 msg_bytes = b''136 if (not disable_timestamps) and timestamp:137 now = datetime.now()138 now_s = now.strftime('%H:%M:%S.%f')[:-3]139 msg_bytes = ('[' + now_s + '] ').encode(stdoutenc)140 if prefix:141 msg_bytes += prefix.encode(stdoutenc, 'replace')142 msg_bytes += ': '.encode(stdoutenc)143 msg_bytes += msg.encode(stdoutenc, 'replace')144 msg_bytes += '\n'.encode(stdoutenc)145 try:146 # write bytes to underlying buffer if it exists147 sys.stdout.buffer.write(msg_bytes)148 except (TypeError, AttributeError):149 sys.stdout.write(msg_bytes.decode(stdoutenc, 'replace'))150 sys.stdout.flush()151def setResultServerAddr(force_server_addr):152 """153 Set server address for result callback.154 """155 global result_server_addr, disable_rpc, xmlrpclib156 result_server_addr = force_server_addr157 if result_server_addr:158 # Re-enable RPC calls unless disabled by env flag159 disable_rpc = int(os.environ.get('PDG_DISABLE_RPC', 0)) > 0160 if not disable_rpc:161 try:162 import xmlrpclib # python 2.x163 except ImportError:164 import xmlrpc.client as xmlrpclib165def getResultServerAddr():166 """167 Return the result server address (or proxy object).168 """169 global result_server_addr170 if result_server_addr:171 return result_server_addr172 elif 'PDG_RESULT_SERVER' in os.environ:173 return os.environ['PDG_RESULT_SERVER']174 else:175 return None176def getSplitServerHostPort(server_addr):177 """178 Returns 'url', 'port' from given 'url:port' format.179 """180 vals = server_addr.split(':')181 if len(vals) >= 2:182 return vals[0], int(vals[1])183 else:184 raise RuntimeError("Invalid result (MQ) server address {0}".format(server_addr))185result_client_id = None186def setResultClientId(client_id):187 """188 Set the client ID for sending RPCs.189 """190 global result_client_id191 result_client_id = client_id192def getResultClientId():193 """194 Get the client ID for sending RPCs.195 """196 if result_client_id:197 return result_client_id198 elif 'PDG_RESULT_CLIENT_ID' in os.environ:199 return os.environ['PDG_RESULT_CLIENT_ID']200 return ''201def setRpcDelegate(delegate_obj):202 """203 Set the RPc Delegate object, which is a factory for RpcProxy objects204 Passing None will re-query the environment, Passing a string will 205 try to import a module of the given name206 """207 global rpc_delegate208 if is_str(delegate_obj):209 # Import the given name210 rpc_delegate = importlib.import_module(delegate_obj)211 elif delegate_obj is not None:212 rpc_delegate = delegate_obj213 else:214 # PDGNet is a special case215 use_pdgutilset = os.environ.get('PDG_JOBUSE_PDGNET', '0') == '1'216 if use_pdgutilset:217 # import from houdini package and fall back on copied script218 try:219 rpc_delegate = importlib.import_module("pdgjob.pdgnetrpc")220 except ImportError:221 rpc_delegate = importlib.import_module("pdgnetrpc")222 else:223 # default is XMLRPC224 rpc_delegate = XMLRpcDelegate()225 #printlog('RPC Delegate: {}'.format(rpc_delegate))226class RpcError(Exception):227 """228 Exception thrown when an RPC communication error occurs229 """230 pass231class XMLRpcProxy():232 """233 Implementation of an RPC Proxy Object that uses XMLRPC.234 Implementations must define the following methods:235 call(self, fn_name, *args, **kwargs) -> None236 commitMulti(self, name): -> object237 """238 def __init__(self, server_proxy, raise_exc):239 self._s = server_proxy240 self._raise_exc = raise_exc241 def call(self, fn_name, *args, **kwargs):242 # perform the RPC. In the case of multi-call it will just register the243 # call which gets later sent by commitMulti, by invoking __call__ on the244 # xmlrpclib.MultiCall245 fn_attr = getattr(self._s, fn_name)246 return self._invokeXMLRpcFn(fn_attr, fn_name, *args)247 def commitMulti(self, name):248 self._invokeXMLRpcFn(self._s, name)249 def _invokeXMLRpcFn(self, fn, fn_name, *args):250 # call the given function and retry if the connection is refused251 if disable_rpc:252 return253 try_count = max_retries + 1254 max_sleep = rpc_backoff255 try:256 socket.setdefaulttimeout(rpc_timeout)257 while try_count > 0:258 try:259 return fn(*args)260 except socket.timeout:261 # We can't safely retry in this case because we risk duplicating262 # the action. FIXME: If we had a transaction ID we could handle263 # this on the server side.264 printlog('Timed out waiting for server to complete action.'265 ' You can increase the timeout by setting $PDG_RPC_TIMEOUT')266 break267 except socket.error as e:268 try_number = max_retries + 2 - try_count269 if e.errno == 10061:270 if try_count > 1:271 backoff = uniform(max_sleep/(try_count-1), max_sleep/try_count)272 printlog(273 'Connection refused. Retry with back off: {:.1f}s {}/{}'.format(274 backoff, try_number, max_retries))275 time.sleep(backoff)276 continue277 if try_count > 1:278 printlog('Socket Error: {}. Retry {}/{}'.format(279 e, try_number, max_retries))280 else:281 printlog('Socket Error: {}'.format(e))282 except xmlrpclib.Fault as err:283 try_number = max_retries + 2 - try_count284 printlog('Failed RPC {} with Fault: {}. Retry {}/{}'.format(285 fn_name, err, try_number, max_retries))286 finally:287 try_count -= 1288 except:289 traceback.print_exc()290 finally:291 socket.setdefaulttimeout(None)292 msg = 'Failed RPC to {}: {} {}'.format(getResultServerAddr(), fn_name, args)293 printlog(msg)294 if self._raise_exc:295 raise RpcError(msg)296class XMLRpcDelegate:297 """298 RPC Delegate for XMLRPC-based callbacks299 """300 def createRpcProxy(self, host, port, client_id, get_reply, multi, raise_exc):301 s = xmlrpclib.ServerProxy('http://{}:{}'.format(host, port))302 if multi:303 s = xmlrpclib.MultiCall(s)304 return XMLRpcProxy(s, raise_exc)305def _invokeRpc(get_reply, fn_name, *args, **kwargs):306 # Invoke a single RPC function call and return the result307 # optional keyword argument server_addr to override the default308 if verbose >= LogVerbose:309 start = time.time()310 server = kwargs.get('server_addr', None)311 raise_exc = kwargs.get('raise_exc', True)312 s = _getRPCProxy(get_reply, False, server, raise_exc)313 result = s.call(fn_name, *args)314 if verbose >= LogVerbose:315 elapsed = time.time() - start316 printlog('RPC {} took {:.2f} ms'.format(fn_name, elapsed * 1000.0))317 return result318def _invokeMultiRpc(proxy, name):319 if verbose >= LogVerbose:320 start = time.time()321 proxy.commitMulti(name)322 if verbose >= LogVerbose:323 elapsed = time.time() - start324 printlog('RPC {} took {:.2f} ms'.format(name, elapsed * 1000.0))325def _getRPCProxy(get_reply, multi=False, server_addr=None, raise_exc=True):326 """327 Returns an RPC proxy object328 """329 global rpc_delegate330 if not server_addr:331 server_addr = getResultServerAddr()332 # Check for runtime overriding of the ServerProxy, which is done with 333 # setResultServerAddr()334 if not is_str(server_addr):335 return server_addr336 host, port = getSplitServerHostPort(server_addr)337 client_id = getResultClientId()338 try:339 return rpc_delegate.createRpcProxy(340 host, port, client_id, get_reply, multi, raise_exc)341 except:342 return rpc_delegate.createRpcProxy(343 host, port, client_id, get_reply, multi)344# set Rpc Deletage from environment when module loads345setRpcDelegate(rpc_delegate_from_env)346#347# Path Utilities348def delocalizePath(local_path):349 """350 Delocalize the given path to be rooted at __PDG_DIR__351 Requires PDG_DIR env var to be present352 """353 # de-localize the result_data path if possible354 # we do this by replacing the file prefix if it matches our expected env var355 356 # don't delocalize non-strings357 if sys.version_info.major >= 3:358 if not isinstance(local_path, str):359 return local_path360 else:361 if not isinstance(local_path, (unicode, str)):362 return local_path363 deloc_path = local_path364 if theDelocalizePaths:365 pdg_dir = os.environ.get('PDG_DIR', None)366 if not pdg_dir:367 return deloc_path368 369 pdg_dir_local = decode_str(pdg_dir)370 # our env var value might be in terms of another env var - so expand again371 pdg_dir_local = os.path.expandvars(pdg_dir_local)372 # normalize path to forward slashes373 pdg_dir_local = pdg_dir_local.replace('\\', '/')374 deloc_path = local_path.replace('\\', '/')375 # ensure pdg_dir_local does not have a trailing slash376 pdg_dir_local = pdg_dir_local.rstrip('/')377 ix = 0378 l_target = len(pdg_dir_local)379 while True:380 ix = deloc_path.find(pdg_dir_local, ix)381 if ix < 0:382 break383 # if the match isn't at the start, check that it has preceeding space 384 # and check that the character after the match is a space or seperator385 end_ix = ix + l_target386 if ix == 0 or (deloc_path[ix-1].isspace() or 387 (deloc_path[ix-1] in (';',':'))):388 if end_ix >= len(deloc_path) or (389 deloc_path[end_ix].isspace() or390 (deloc_path[end_ix] in ('/', ';', ':'))):391 deloc_path = deloc_path[:ix] + '__PDG_DIR__' + deloc_path[ix + l_target:]392 ix = end_ix393 return deloc_path394# Makes a directory if it does not exist, and is made to be safe against395# directory creation happening concurrent while we're attemtping to make it396def makeDirSafe(local_path):397 if not local_path:398 return399 try:400 os.makedirs(local_path)401 except OSError:402 if not os.path.isdir(local_path):403 raise404def _substitute_scheduler_vars(data):405 for var in ('PDG_DIR', 'PDG_ITEM_NAME', 'PDG_TEMP', 'PDG_RESULT_SERVER',406 'PDG_INDEX', 'PDG_SCRIPTDIR', 'PDG_ITEM_ID'):407 varsym = '__' + var + '__'408 if varsym in data:409 try:410 val = decode_str(os.environ[var])411 data = data.replace(varsym, val)412 except KeyError:413 pass414 return data415def _applyPathMapForZone(loc_path, path_map):416 # Apply path mapping rules to the given path. Supports417 # chaining of rules by applying mapping rules repeatedly until418 # there are no more matches419 changed = True420 while changed:421 changed = False422 for zonepath in path_map:423 if zonepath[2]:424 if zonepath[1] in loc_path:425 continue426 loc_path_new = loc_path.replace(zonepath[0], zonepath[1])427 changed |= loc_path != loc_path_new428 loc_path = loc_path_new429 return loc_path430def _applyPathMapping(loc_path):431 # Apply '*' zone maps432 loc_path = _applyPathMapForZone(loc_path, thePathMaps[0])433 # Apply local zone maps434 loc_path = _applyPathMapForZone(loc_path, thePathMaps[1])435 return loc_path436def localizePath(deloc_path):437 """438 Localize the given path. This means replace any __PDG* tokens and439 expand env vars with the values in the current environment. Also440 applies path mapping if PDG_PATHMAP is present.441 """442 global thePathMaps443 loc_path = _substitute_scheduler_vars(deloc_path)444 loc_path = os.path.expandvars(loc_path)445 # support env vars defined as other env vars446 loc_path = os.path.expandvars(loc_path)447 loc_path = loc_path.replace("\\", "/")448 if thePathMaps is None:449 thePathMaps = _buildPathMap()450 loc_path = _applyPathMapping(loc_path)451 # Expand variables one more time for variables introduced by path maps.452 loc_path = os.path.expandvars(loc_path)453 return loc_path454def _buildPathMap():455 # Returns a pair of path mappings: [['*' zone mappings], [localzone mappings]]456 zonepaths = [[], []]457 pathmap = os.environ.get('PDG_PATHMAP', '')458 if not pathmap:459 return zonepaths460 try:461 pathmap = json.loads(pathmap)462 except ValueError as e:463 raise type(e)(str(e) + ': While parsing $PDG_PATHMAP')464 myzone = os.environ.get('PDG_PATHMAP_ZONE', '')465 if not myzone:466 if sys.platform.lower() == 'win32':467 myzone = 'WIN'468 elif sys.platform.lower() == 'darwin':469 myzone = 'MAC'470 elif sys.platform.lower().startswith('linux'):471 myzone = 'LINUX'472 else:473 printlog('Warning: Unsupported platform {} for Path Map'.format(474 sys.platform))475 return zonepaths476 def load_pathmap_for_zone(zone):477 paths = pathmap['paths']478 for e in paths:479 for from_path, v in e.items():480 e_zone = v['zone']481 if e_zone == zone or e_zone == '*':482 to_path = v['path']483 from_path = from_path.replace('\\', '/')484 to_path = to_path.replace('\\', '/')485 if from_path.endswith('/') and not to_path.endswith('/'):486 to_path += '/'487 is_subpath = to_path.startswith(from_path)488 if e_zone == '*':489 zonepaths[0].append((from_path, to_path, is_subpath))490 else:491 zonepaths[1].append((from_path, to_path, is_subpath))492 nmaps = len(zonepaths[0]) + len(zonepaths[1])493 if nmaps:494 printlog('PDG: Pathmap Zone {} with {} mappings for this zone.'.format(495 myzone, len(zonepaths[0]) + len(zonepaths[1])))496 return nmaps497 if not load_pathmap_for_zone(myzone):498 if myzone != 'WIN':499 # Backwards compatibility - POSIX has become MAC+LINUX500 nmaps = load_pathmap_for_zone('POSIX')501 if nmaps:502 printlog('Warning: "POSIX" Path Map Zone is deprecated'503 ', please use "{}"'.format(myzone))504 return zonepaths505# Callback Helper Functions.506# These functions are used in task code to report status and results507# to the PDG callback server508#509def _getClientHost():510 try:511 net_hostname = socket.getfqdn()512 ip_addr = socket.gethostbyname(net_hostname)513 except:514 try:515 net_hostname = platform.node()516 ip_addr = socket.gethostbyname(net_hostname)517 except:518 net_hostname = socket.gethostname()519 ip_addr = socket.gethostbyname(net_hostname)520 return (net_hostname, ip_addr)521def verboseAnnounce():522 # Print verbose job information, called by job wrapper script523 if verbose < LogVerbose:524 return525 myhost, myip = _getClientHost()526 client_id = getResultClientId()527 printlog('PDG Client is {} [{}] [{}]'.format(myhost, myip, client_id))528 server_addr = getResultServerAddr()529 if not server_addr:530 return531 host, port = getSplitServerHostPort(server_addr)532 try:533 ip = socket.gethostbyname(host)534 printlog('PDG Result Server is {}:{} [{}]'.format(host, port, ip))535 except Exception as e:536 printlog('PDG Result Server is {}:{} but FAILED to resolve:\n{}'.format(537 host, port, str(e))) 538def _checkItemIdArg(workitem_id):539 # API functions previously took item_name to identify the work item, we now540 # take work item id.541 import numbers542 if (workitem_id is None) or isinstance(workitem_id, numbers.Integral):543 return544 printlog('Passing PDG_ITEM_NAME as workitem_id for RPC functions is '545 'deprecated, please pass PDG_ITEM_ID int')546def getItemIdFromEnv():547 """548 Retrieve work_item.id from the job environment or None.549 """550 try:551 item_id = int(os.environ['PDG_ITEM_ID'])552 except KeyError:553 # Deprecated - use work_item.name554 item_id = os.environ.get('PDG_ITEM_NAME', None)555 return item_id556def waitUntilReady(workitem_id, subindex, server_addr=None, raise_exc=True):557 """558 Blocks until a batch sub item can begin cooking.559 subindex: the index of the batch item within it's batch.560 server_addr: the result server address, defaulting to561 the value of $PDG_RESULT_SERVER562 raise_exc: whether or not RPC exceptions should be re-raised after logging563 """564 if disable_rpc:565 return566 _checkItemIdArg(workitem_id)567 # we release our slots until we are ready to continue568 if release_job_slots_on_poll:569 _invokeRpc(False, 'release_job_slots', workitem_id, theJobid,570 server_addr=server_addr, raise_exc=raise_exc)571 while True:572 r = _invokeRpc(True, 'check_ready_batch', workitem_id, subindex,573 server_addr=server_addr, raise_exc=raise_exc)574 if r:575 enum_val = int(r)576 if enum_val == 1:577 break578 elif enum_val == 2:579 raise RuntimeError('Failed Dependency!')580 time.sleep(batch_poll_delay)581 if release_job_slots_on_poll:582 _invokeRpc(False, 'acquire_job_slots', workitem_id, theJobid,583 server_addr=server_addr, raise_exc=raise_exc)584def execBatchPoll(item_name, subindex, server_addr=None):585 printlog('execBatchPoll is deprecated, please use waitUntilReady instead.')586 return waitUntilReady(item_name, subindex, server_addr)587def getWorkItemJSON(workitem_id, subindex, server_addr=None, raise_exc=True):588 """589 Returns a string containing the serialized json for the given590 work item.591 subindex: the index of the batch item within it's batch.592 server_addr: the result server address, defaulting to593 the value of $PDG_RESULT_SERVER594 raise_exc: whether or not RPC exceptions should be re-raised after logging595 """596 if disable_rpc:597 return ''598 _checkItemIdArg(workitem_id)599 return _invokeRpc(True, 'get_workitem_json', workitem_id, subindex,600 server_addr=server_addr, raise_exc=raise_exc)601def workItemSuccess(workitem_id, subindex=-1, server_addr=None, to_stdout=True, raise_exc=True):602 """603 Reports that the given item has succeeded.604 subindex: the index of the batch item within it's batch.605 server_addr: the result server address, defaulting to606 the value of $PDG_RESULT_SERVER607 to_stdout: also emit a status message to stdout608 raise_exc: whether or not RPC exceptions should be re-raised after logging609 """610 if to_stdout:611 printlog("PDG_SUCCESS: {};{};{}".format(workitem_id, subindex, 0))612 if disable_rpc:613 return614 _checkItemIdArg(workitem_id)615 if subindex < 0:616 _invokeRpc(False, "success", workitem_id, 0, theJobid,617 server_addr=server_addr, raise_exc=raise_exc)618 else:619 _invokeRpc(False, "success_batch", workitem_id, subindex, 0, theJobid,620 server_addr=server_addr, raise_exc=raise_exc)621def execBatchSuccess(item_name, subindex, server_addr=None, to_stdout=True):622 printlog('execBatchSuccess is deprecated, please use workItemSuccess instead.')623 return workItemSuccess(item_name, subindex, server_addr, to_stdout)624def workItemFailed(workitem_id, server_addr=None, to_stdout=True, raise_exc=True):625 """626 Report when an item has failed.627 workitem_id: id of the associated work item628 server_addr: callback server in format 'IP:PORT', or emptry string to ignore629 to_stdout: also emit status messages to stdout630 raise_exc: whether or not RPC exceptions should be re-raised after logging631 632 Note: Batch subitems not supported. Failure of a batch subitem will 633 automatically result in the failure of the batch item.634 """635 if disable_rpc:636 return637 _checkItemIdArg(workitem_id)638 _invokeRpc(False, "failed", workitem_id, theJobid,639 server_addr=server_addr, raise_exc=raise_exc)640def execItemFailed(item_name, server_addr=None, to_stdout=True):641 printlog('execItemFailed is deprecated, please use workItemFailed instead.')642 return workItemFailed(item_name, server_addr, to_stdout)643def workItemStartCook(workitem_id=None, subindex=-1, server_addr=None, to_stdout=True, raise_exc=True):644 """645 Reports than a work item has started cooking.646 """647 if not workitem_id:648 workitem_id = getItemIdFromEnv()649 650 if to_stdout:651 printlog("PDG_START: {};{}".format(workitem_id, subindex))652 if disable_rpc:653 return654 _checkItemIdArg(workitem_id)655 if subindex >= 0:656 _invokeRpc(False, "start_cook_batch", workitem_id, subindex, theJobid,657 server_addr=server_addr, raise_exc=raise_exc)658 else:659 _invokeRpc(False, "start_cook", workitem_id, theJobid,660 server_addr=server_addr, raise_exc=raise_exc)661def execStartCook(item_name=None, subindex=-1, server_addr=None, to_stdout=True):662 printlog('execStartCook is deprecated, please use workItemStartCook instead.')663 return workItemStartCook(item_name, subindex, server_addr, to_stdout)664def workItemAppendLog(log_data, log_type=3, workitem_id=None, subindex=-1, server_addr=None, to_stdout=True, raise_exc=True):665 """666 Report log data back to PDG for the work item667 """668 if not workitem_id:669 workitem_id = getItemIdFromEnv()670 671 if to_stdout:672 printlog("PDG_APPEND_LOG: {};{};{}".format(workitem_id, subindex, len(log_data)))673 if disable_rpc:674 return675 _checkItemIdArg(workitem_id)676 _invokeRpc(False, "append_log", workitem_id, subindex, log_data, int(log_type), theJobid,677 server_addr=server_addr, raise_exc=raise_exc)678def workItemCancelled(workitem_id, server_addr=None, raise_exc=True):679 """680 Report when a work item has been explicitly cancelled.681 workitem_id: id of the associated workitem682 server_addr: callback server in format 'IP:PORT', or emptry string to ignore683 raise_exc: whether or not RPC exceptions should be re-raised after logging684 685 Note: Batch subitems can not be cancelled, cancel the batch itself instead.686 """687 if disable_rpc:688 return689 _checkItemIdArg(workitem_id)690 691 _invokeRpc(False, "cancelled", workitem_id, theJobid,692 server_addr=server_addr, raise_exc=raise_exc)693def _decodeValForPrint(val):694 if not is_str(val):695 return val696 if type(val) is bytearray:697 return '(bytearray length {})'.format(len(val))698 if sys.version_info.major >= 3:699 str_type = str700 if type(val) is str and len(val) > 260:701 return val[0:90] + '...({} bytes)'.format(len(val))702 else:703 str_type = unicode704 try:705 if len(val) > 260:706 decodedval = str_type(val[0:90], 'utf8', 'replace') +\707 '...(' + str_type(len(val)) + ' bytes)'708 else:709 decodedval = str_type(val, 'utf8', 'replace')710 except TypeError:711 return val712 return decodedval713def reportResultData(result_data, workitem_id=None, server_addr=None,714 result_data_tag="", subindex=-1, and_success=False, to_stdout=True,715 raise_exc=True, duration=0.0, hash_code=0, batch_size=50):716 """717 Reports a result to PDG via the callback server.718 workitem_id: id of the associated workitem (default $PDG_ITEM_ID)719 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)720 if there is no env var it will default to stdout reporting only.721 result_data: result data - treated as bytes if result_data_tag is passed722 result_data_tag: result tag to categorize result. Eg: 'file/geo'723 Default is empty which means attempt to categorize using file extension.724 subindex: The batch subindex if this is a batch item.725 and_success: If True, report success in addition to result_data726 to_stdout: also emit status messages to stdout727 raise_exc: whether or not RPC exceptions should be re-raised after logging728 duration: [Unused] cook time of the item in seconds, only report with and_success729 hash_code: int that can be used to check if this file has changed, usually this is the modify-time of the file.730 batch_size: Maximum number of results to send per multicall invoke731 """732 if not isinstance(result_data, (list, tuple)):733 all_result_data_list = [result_data]734 else:735 all_result_data_list = result_data736 n_results = len(all_result_data_list)737 if not all_result_data_list:738 raise TypeError("result_data is invalid")739 if not is_str(all_result_data_list[0]):740 raise TypeError("result_data must be string-like or a list of string-like")741 if not workitem_id:742 workitem_id = getItemIdFromEnv()743 if sys.version_info.major >= 3:744 str_type = str745 else:746 str_type = unicode747 748 def send_results(result_data_list):749 if not disable_rpc:750 proxy = _getRPCProxy(False, True, server_addr, raise_exc)751 for result_data_elem in result_data_list:752 # de-localize the result_data path if possible753 # we do this by replacing the file prefix if it matches our expected env var754 result_data_elem = delocalizePath(result_data_elem)755 if to_stdout:756 result_data_elem_print = _decodeValForPrint(result_data_elem)757 printlog('PDG_RESULT: {};{};{};{};{}'.format(workitem_id, subindex,758 result_data_elem_print, result_data_tag, hash_code))759 if and_success:760 printlog("PDG_SUCCESS: {};{};{}".format(workitem_id, subindex, duration))761 if not disable_rpc:762 if isinstance(result_data_elem, str_type):763 # convert unicode to raw bytes, to be encoded with base64764 result_data_elem = result_data_elem.encode('utf8')765 if and_success:766 if subindex >= 0:767 proxy.call('success_and_result_batch', workitem_id,768 xmlrpclib.Binary(result_data_elem),769 result_data_tag, subindex, hash_code, duration, theJobid)770 else:771 proxy.call('success_and_result', workitem_id,772 xmlrpclib.Binary(result_data_elem),773 result_data_tag, hash_code, duration, theJobid)774 else:775 if subindex >= 0:776 proxy.call('result_batch', workitem_id,777 xmlrpclib.Binary(result_data_elem),778 result_data_tag, subindex, hash_code, theJobid)779 else:780 proxy.call('result', workitem_id,781 xmlrpclib.Binary(result_data_elem),782 result_data_tag, hash_code, theJobid)783 if not disable_rpc:784 _invokeMultiRpc(proxy, 'reportResultData')785 # The multicall RPC may not support unlimited payload size,786 # so to be safe we chunk it up into batches787 if n_results <= batch_size:788 send_results(all_result_data_list)789 else:790 chunks = [all_result_data_list[i:i+batch_size] for i in \791 range(0, n_results, batch_size)]792 for chunk in chunks:793 send_results(chunk)794def writeAttribute(attr_name, attr_value, item_name=None, server_addr=None, raise_exc=True):795 """796 [Deprecated]797 Writes attribute data back into a work item in PDG via the callback server.798 item_name: name of the associated workitem (default $PDG_ITEM_NAME)799 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)800 if there is no env var it will default to stdout reporting only.801 raise_exc: whether or not RPC exceptions should be re-raised after logging802 attr_name: name of the attribute803 attr_value: single value or array of string/float/int data804 """805 printlog("writeAttribute is deprecated, please use the set*Attrib functions")806 if not isinstance(attr_value, (list, tuple)):807 attr_value_list = [attr_value]808 else:809 attr_value_list = attr_value810 if not attr_value_list:811 raise TypeError("attr_value is invalid")812 if not is_str(attr_value_list[0]) and not isinstance(attr_value_list[0], (int, float)):813 raise TypeError("result_data must be string, int or float (array)")814 if not item_name:815 item_name = os.environ['PDG_ITEM_NAME']816 printlog("PDG_RESULT_ATTR: {};{};{}".format(item_name, attr_name, attr_value_list))817 if disable_rpc:818 return819 _invokeRpc(False, "write_attr", item_name, attr_name, attr_value_list, theJobid,820 server_addr=server_addr, raise_exc=raise_exc)821def _setAttrHelper(attr_name, attr_value, fname, workitem_id, subindex, server_addr, raise_exc):822 if not workitem_id:823 workitem_id = getItemIdFromEnv()824 if type(attr_value) is list:825 attr_value_print = [_decodeValForPrint(v) for v in attr_value]826 else:827 attr_value_print = _decodeValForPrint(attr_value)828 printlog("PDG_{}: {};{};{}".format(fname.upper().replace(' ', ''), workitem_id,829 attr_name, attr_value_print))830 if disable_rpc:831 return832 _checkItemIdArg(workitem_id)833 _invokeRpc(False, fname, workitem_id, subindex, attr_name, attr_value,834 theJobid, server_addr=server_addr, raise_exc=raise_exc)835def _setAttrIndexHelper(attr_name, attr_value, attr_index, fname, workitem_id, subindex, server_addr, raise_exc):836 if not workitem_id:837 workitem_id = getItemIdFromEnv()838 attr_value_print = _decodeValForPrint(attr_value)839 printlog("PDG_{}: {};{};{}".format(fname.upper().replace(' ', ''), workitem_id,840 attr_name, attr_value_print))841 if disable_rpc:842 return843 _checkItemIdArg(workitem_id)844 _invokeRpc(False, fname, workitem_id, subindex, attr_name, attr_value, attr_index,845 theJobid, server_addr=server_addr, raise_exc=raise_exc)846def setStringAttribArray(attr_name, attr_value, workitem_id=None, subindex=-1, server_addr=None, raise_exc=True):847 """848 Writes attribute data back into a work item in PDG via the callback server.849 workitem_id: id of the associated workitem (default $PDG_ITEM_ID)850 subindex: batch subindex of item (-1 indicates a non-batch item)851 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)852 if there is no env var it will default to stdout reporting only.853 raise_exc: whether or not RPC exceptions should be re-raised after logging854 attr_name: name of the attribute855 attr_value: array of strings856 """857 _setAttrHelper(attr_name, attr_value, "set_string_attrib_array", workitem_id, subindex, server_addr, raise_exc)858def setIntAttribArray(attr_name, attr_value, workitem_id=None, subindex=-1, server_addr=None, raise_exc=True):859 """860 Writes attribute data back into a work item in PDG via the callback server.861 workitem_id: id of the associated workitem (default $PDG_ITEM_ID)862 subindex: batch subindex of item (-1 indicates a non-batch item)863 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)864 if there is no env var it will default to stdout reporting only.865 raise_exc: whether or not RPC exceptions should be re-raised after logging866 attr_name: name of the attribute867 attr_value: array of integers868 """869 _setAttrHelper(attr_name, attr_value, "set_int_attrib_array", workitem_id, subindex, server_addr, raise_exc)870def setFloatAttribArray(attr_name, attr_value, workitem_id=None, subindex=-1, server_addr=None, raise_exc=True):871 """872 Writes attribute data back into a work item in PDG via the callback server.873 workitem_id: id of the associated workitem (default $PDG_ITEM_ID)874 subindex: batch subindex of item (-1 indicates a non-batch item)875 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)876 if there is no env var it will default to stdout reporting only.877 raise_exc: whether or not RPC exceptions should be re-raised after logging878 attr_name: name of the attribute879 attr_value: array of floats880 """881 _setAttrHelper(attr_name, attr_value, "set_float_attrib_array", workitem_id, subindex, server_addr, raise_exc)882def setFileAttribArray(attr_name, attr_value, workitem_id=None, subindex=-1, server_addr=None, raise_exc=True):883 """884 Writes attribute data back into a work item in PDG via the callback server.885 workitem_id: id of the associated workitem (default $PDG_ITEM_ID)886 subindex: batch subindex of item (-1 indicates a non-batch item)887 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)888 if there is no env var it will default to stdout reporting only.889 raise_exc: whether or not RPC exceptions should be re-raised after logging890 attr_name: name of the attribute891 attr_value: array of pdgjson.File objects892 """893 _setAttrHelper(attr_name, attr_value, "set_file_attrib_array", workitem_id, subindex, server_addr, raise_exc)894def setPyObjectAttrib(attr_name, attr_value, workitem_id=None, subindex=-1, server_addr=None, raise_exc=True):895 """896 Writes attribute data back into a work item in PDG via the callback server.897 workitem_id: id of the associated workitem (default $PDG_ITEM_ID)898 subindex: batch subindex of item (-1 indicates a non-batch item)899 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)900 if there is no env var it will default to stdout reporting only.901 raise_exc: whether or not RPC exceptions should be re-raised after logging902 attr_name: name of the attribute903 attr_value: string that is a valid repr() of a python object904 """905 _setAttrHelper(attr_name, attr_value, "set_pyobject_attrib", workitem_id, subindex, server_addr, raise_exc)906def setStringAttrib(attr_name, attr_value, attr_index, workitem_id=None, subindex=-1, server_addr=None, raise_exc=True):907 """908 Writes attribute data back into a work item in PDG via the callback server.909 workitem_id: id of the associated workitem (default $PDG_ITEM_ID)910 subindex: batch subindex of item (-1 indicates a non-batch item)911 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)912 if there is no env var it will default to stdout reporting only.913 raise_exc: whether or not RPC exceptions should be re-raised after logging914 attr_name: name of the attribute915 attr_value: string value916 """917 _setAttrIndexHelper(attr_name, attr_value, attr_index, "set_string_attrib", workitem_id, subindex, server_addr, raise_exc)918def setIntAttrib(attr_name, attr_value, attr_index, workitem_id=None, subindex=-1, server_addr=None, raise_exc=True):919 """920 Writes attribute data back into a work item in PDG via the callback server.921 workitem_id: id of the associated workitem (default $PDG_ITEM_ID)922 subindex: batch subindex of item (-1 indicates a non-batch item)923 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)924 if there is no env var it will default to stdout reporting only.925 raise_exc: whether or not RPC exceptions should be re-raised after logging926 attr_name: name of the attribute927 attr_value: integer value928 """929 _setAttrIndexHelper(attr_name, attr_value, attr_index, "set_int_attrib", workitem_id, subindex, server_addr, raise_exc)930def setFloatAttrib(attr_name, attr_value, attr_index, workitem_id=None, subindex=-1, server_addr=None, raise_exc=True):931 """932 Writes attribute data back into a work item in PDG via the callback server.933 workitem_id: id of the associated workitem (default $PDG_ITEM_ID)934 subindex: batch subindex of item (-1 indicates a non-batch item)935 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)936 if there is no env var it will default to stdout reporting only.937 raise_exc: whether or not RPC exceptions should be re-raised after logging938 attr_name: name of the attribute939 attr_value: float value940 """941 _setAttrIndexHelper(attr_name, attr_value, attr_index, "set_float_attrib", workitem_id, subindex, server_addr, raise_exc)942def setFileAttrib(attr_name, attr_value, attr_index, workitem_id=None, subindex=-1, server_addr=None, raise_exc=True):943 """944 Writes attribute data back into a work item in PDG via the callback server.945 workitem_id: id of the associated workitem (default $PDG_ITEM_ID)946 subindex: batch subindex of item (-1 indicates a non-batch item)947 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)948 if there is no env var it will default to stdout reporting only.949 raise_exc: whether or not RPC exceptions should be re-raised after logging950 attr_name: name of the attribute951 attr_value: pdg.File value952 """953 _setAttrIndexHelper(attr_name, attr_value, attr_index, "set_file_attrib", workitem_id, subindex, server_addr, raise_exc)954def invalidateCache(workitem_id=None, subindex=-1, server_addr=None, raise_exc=True):955 """956 Requests that the cache of the work item be invalidated by PDG. This forces957 downstream tasks to cook. The same effect can be achieved by adding an958 output file to the work item, however this method can be used to invalidate959 caches without explicitly adding a file.960 workitem_id: id of the associated work item (default $PDG_ITEM_ID)961 subindex: batch subindex of item (-1 indicates a non-batch item)962 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)963 if there is no env var it will default to stdout reporting only.964 raise_exc: whether or not RPC exceptions should be re-raised after logging965 """966 if not workitem_id:967 workitem_id = getItemIdFromEnv()968 printlog("PDG_INVALIDATE_CACHE: {};{}".format(workitem_id, subindex))969 if disable_rpc:970 return971 _checkItemIdArg(workitem_id)972 _invokeRpc(False, "invalidate_cache", workitem_id, subindex, theJobid, server_addr=server_addr, raise_exc=raise_exc)973def reportServerStarted(servername, pid, host, port, proto_type, log_fname, workitem_id=None,974 server_addr=None, raise_exc=True):975 """976 Reports that a shared server has been started.977 workitem_id: id of the associated workitem (default $PDG_ITEM_ID)978 server_addr: callback server in format 'IP:PORT' (default $PDG_RESULT_SERVER)979 raise_exc: whether or not RPC exceptions should be re-raised after logging980 """981 if disable_rpc:982 return983 _checkItemIdArg(workitem_id)984 sharedserver_message = {985 "name" : servername,986 "pid" : pid,987 "host" : host,988 "port" : port,989 "proto_type" : proto_type990 }991 if not workitem_id:992 workitem_id = int(os.environ['PDG_ITEM_ID'])993 multicall = _getRPCProxy(False, True, server_addr, raise_exc)994 if sys.version_info.major >= 3:995 host = bytes(host, 'utf8')996 port = bytes(str(port), 'utf8')997 log_fname = bytes(log_fname, 'utf8')998 else:999 host = str(host)1000 port = str(port)1001 log_fname = str(log_fname)1002 multicall.call('sharedserver_started', sharedserver_message, theJobid)1003 multicall.call('result', workitem_id, xmlrpclib.Binary(host), "socket/ip", 0, theJobid)1004 multicall.call('result', workitem_id, xmlrpclib.Binary(port), "socket/port", 0, theJobid)1005 multicall.call('result', workitem_id, xmlrpclib.Binary(log_fname), "file/text/log", 0, theJobid)1006 _invokeMultiRpc(multicall, 'reportServerStarted')1007def warning(message, workitem_id=None, server_addr=None, raise_exc=True):1008 if disable_rpc:1009 return1010 _checkItemIdArg(workitem_id)1011 if not workitem_id:1012 workitem_id = int(os.environ['PDG_ITEM_ID'])1013 _invokeRpc(False, "warning", workitem_id, message, theJobid,1014 server_addr=server_addr, raise_exc=raise_exc)1015def keepalive(workitem_id=None, server_addr=None, raise_exc=True):1016 """1017 Called by the job wrapper script when the scheduler requires heartbeat signals.1018 """1019 if not workitem_id:1020 workitem_id = int(os.environ['PDG_ITEM_ID'])1021 _checkItemIdArg(workitem_id)1022 _invokeRpc(False, 'keepalive', workitem_id, theJobid,1023 server_addr=server_addr, raise_exc=raise_exc)1024 1025def execCommand(command, toolName=None):1026 """1027 Executes a command1028 """1029 printlog("Executing command: {}".format(command))1030 try:1031 process = subprocess.Popen(shlex.split(command))1032 process.communicate()1033 if process.returncode != 0:1034 exit(1)1035 except subprocess.CalledProcessError as cmd_err:1036 printlog("ERROR: problem executing command {}".format(command))1037 printlog(cmd_err)1038 exit(1)1039 except OSError as os_err:1040 # OSError might be due to missing executable, if that's the1041 # case, inform the user about it.1042 # We could check this before trying to execute, but considering this is1043 # the exception, I'd rather not check this every time we run the command1044 try:1045 import distutils.spawn1046 executableName = shlex.split(command)[0]1047 if not distutils.spawn.find_executable(executableName):1048 printlog("ERROR: could not find executable {}".format(executableName))1049 printlog("Are you sure you have {} installed?".format(toolName or executableName))1050 else:1051 printlog("ERROR: problem executing command {}".format(command))1052 printlog(os_err)1053 except:1054 printlog("ERROR: problem executing command {}".format(command))1055 printlog(os_err)1056 exit(1)1057class PDGPingHelper():1058 """1059 Checks if PDG Result Server is reachable at a fixed period.1060 """1061 def __init__(self, host, port, min_check_period):1062 self.result_server = (host, port)1063 self.min_check_period = min_check_period1064 self.next_check_time = time.time() + self.min_check_period1065 def isReachable(self, raise_on_failure=False):1066 """1067 Returns True if PDG is reachable or if the minimum check period has not1068 elapsed.1069 """1070 now = time.time()1071 if self.next_check_time > now:1072 return True1073 self.next_check_time = now + self.min_check_period1074 # ping the PDG Result Server port and raise an exception if it can't be1075 # reached1076 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)1077 s.settimeout(5)1078 try:1079 s.connect(self.result_server)1080 s.close()1081 except Exception as e:1082 print("Exception caught while contacting PDG Result Server:", e)1083 sys.stdout.flush()1084 # Might get a timeout or connection-refused error1085 if raise_on_failure:1086 raise RpcError(1087 'Could not reach PDG Result Server at {}:{}'.format(1088 *self.result_server))1089 return False1090 return True1091 def ensureReachable(self):1092 return self.isReachable(True)1093def main():1094 import argparse1095 parser = argparse.ArgumentParser(1096 description=\1097"""1098Runs an RPC as via command line.1099The following env vars are expected:1100PDG_ITEM_ID1101PDG_RESULT_SERVER1102PDG_JOBUSE_PDGNET1103If PDG_JOBUSE_PDGNET=1:1104PDG_RESULT_CLIENT_ID1105PDG_HTTP_PORT1106""")1107 group = parser.add_mutually_exclusive_group()1108 group.add_argument('--cancel', action='store_true',1109 help='Work Item has been cancelled')1110 group.add_argument('--fail', action='store_true',1111 help='Work Item has failed')1112 group.add_argument('--success', action='store_true',1113 help='Work Item succeeded')1114 1115 args = parser.parse_args()1116 1117 workitem_id = getItemIdFromEnv()1118 if args.cancel:1119 workItemCancelled(workitem_id)1120 if args.fail:1121 workItemFailed(workitem_id)1122 if args.success:1123 workItemSuccess(workitem_id)1124if __name__ == "__main__":...
test_histfile.py
Source:test_histfile.py
1import unittest2import sys3import os4from os.path import isfile, expanduser, abspath5from rl import history6from rl import readline7from rl.testing import JailSetup8from rl.testing import reset9class HistoryFileTests(JailSetup):10 # XXX You will lose your ~/.history file when you run these tests11 def setUp(self):12 JailSetup.setUp(self)13 reset()14 self.histfile = expanduser('~/.history')15 self.remove_histfile()16 def tearDown(self):17 self.remove_histfile()18 JailSetup.tearDown(self)19 def remove_histfile(self):20 if isfile(self.histfile):21 os.remove(self.histfile)22 def test_no_histfile(self):23 self.assertEqual(isfile(self.histfile), False)24 def test_read_file_raises_exception(self):25 self.assertRaises(IOError,26 history.read_file, 'my_history', raise_exc=True)27 def test_write_file_raises_exception(self):28 self.assertRaises(IOError,29 history.write_file, '~/~/.my_history', raise_exc=True)30 def test_append_file_raises_exception(self):31 self.assertRaises(IOError,32 history.append_file, 0, '~/~/.my_history', raise_exc=True)33 def test_read_relative(self):34 history.append('fred')35 history.append('wilma')36 history.write_file('my_history', raise_exc=True)37 history.clear()38 history.read_file('my_history', raise_exc=True)39 self.assertEqual(len(history), 2)40 def test_write_relative(self):41 history.append('fred')42 history.append('wilma')43 history.write_file('my_history', raise_exc=True)44 self.assertTrue(isfile('my_history'))45 def test_append_relative(self):46 history.write_file('my_history', raise_exc=True)47 history.append('fred')48 history.append('wilma')49 history.append_file(2, 'my_history')50 history.clear()51 history.read_file('my_history', raise_exc=True)52 self.assertEqual(len(history), 2)53 def test_read_abspath(self):54 history.append('fred')55 history.append('wilma')56 history.write_file('my_history', raise_exc=True)57 history.clear()58 history.read_file(abspath('my_history'), raise_exc=True)59 self.assertEqual(len(history), 2)60 def test_write_abspath(self):61 history.append('fred')62 history.append('wilma')63 history.write_file(abspath('my_history'), raise_exc=True)64 self.assertTrue(isfile(abspath('my_history')))65 self.assertTrue(isfile('my_history'))66 def test_append_abspath(self):67 history.write_file('my_history', raise_exc=True)68 history.append('fred')69 history.append('wilma')70 history.append_file(2, abspath('my_history'))71 history.clear()72 history.read_file('my_history', raise_exc=True)73 self.assertEqual(len(history), 2)74 def test_read_default_name(self):75 history.append('fred')76 history.append('wilma')77 history.write_file(self.histfile, raise_exc=True)78 history.clear()79 history.read_file(raise_exc=True)80 self.assertEqual(len(history), 2)81 def test_write_default_name(self):82 history.append('fred')83 history.append('wilma')84 history.write_file(raise_exc=True)85 self.assertTrue(isfile(self.histfile))86 def test_append_default_name(self):87 history.write_file(self.histfile, raise_exc=True)88 history.append('fred')89 history.append('wilma')90 history.append_file(2)91 history.clear()92 history.read_file(self.histfile, raise_exc=True)93 self.assertEqual(len(history), 2)94 def test_read_None_name(self):95 history.append('fred')96 history.append('wilma')97 history.write_file(self.histfile, raise_exc=True)98 history.clear()99 history.read_file(None, raise_exc=True)100 self.assertEqual(len(history), 2)101 def test_write_None_name(self):102 history.append('fred')103 history.append('wilma')104 history.write_file(None, raise_exc=True)105 self.assertTrue(isfile(self.histfile))106 def test_append_None_name(self):107 history.write_file(self.histfile, raise_exc=True)108 history.append('fred')109 history.append('wilma')110 history.append_file(2, None)111 history.clear()112 history.read_file(self.histfile, raise_exc=True)113 self.assertEqual(len(history), 2)114 def test_read_empty_string(self):115 history.append('fred')116 history.append('wilma')117 history.write_file(self.histfile, raise_exc=True)118 history.clear()119 self.assertRaises(IOError, history.read_file, '', raise_exc=True)120 def test_write_empty_string(self):121 history.append('fred')122 history.append('wilma')123 self.assertRaises(IOError, history.write_file, '', raise_exc=True)124 def test_append_empty_string(self):125 history.write_file(self.histfile, raise_exc=True)126 history.append('fred')127 history.append('wilma')128 self.assertRaises(IOError, history.append_file, 1, '', raise_exc=True)129 def test_read_tilde_expanded(self):130 history.append('fred')131 history.append('wilma')132 history.write_file(self.histfile, raise_exc=True)133 history.clear()134 history.read_file('~/.history', raise_exc=True)135 self.assertEqual(len(history), 2)136 def test_write_tilde_expanded(self):137 history.append('fred')138 history.append('wilma')139 history.write_file('~/.history', raise_exc=True)140 self.assertTrue(isfile(self.histfile))141 def test_append_tilde_expanded(self):142 history.write_file(self.histfile, raise_exc=True)143 history.append('fred')144 history.append('wilma')145 history.append_file(2, '~/.history')146 history.clear()147 history.read_file(self.histfile, raise_exc=True)148 self.assertEqual(len(history), 2)149 if sys.version_info[0] >= 3:150 def test_read_bytes_name(self):151 history.append('fred')152 history.append('wilma')153 history.write_file(bytes('my_history', sys.getfilesystemencoding()), raise_exc=True)154 history.clear()155 history.read_file(bytes('my_history', sys.getfilesystemencoding()), raise_exc=True)156 self.assertEqual(len(history), 2)157 def test_write_bytes_name(self):158 history.append('fred')159 history.append('wilma')160 history.write_file(bytes('my_history', sys.getfilesystemencoding()), raise_exc=True)161 self.assertTrue(isfile('my_history'))162 def test_append_bytes_name(self):163 history.write_file(bytes('my_history', sys.getfilesystemencoding()), raise_exc=True)164 history.append('fred')165 history.append('wilma')166 history.append_file(2, bytes('my_history', sys.getfilesystemencoding()))167 history.clear()168 history.read_file(bytes('my_history', sys.getfilesystemencoding()), raise_exc=True)169 self.assertEqual(len(history), 2)170 def test_read_file_stifled(self):171 history.append('fred')172 history.append('wilma')173 history.append('barney')174 history.append('betty')175 history.append('pebbles')176 history.append('bammbamm')177 history.append('dino')178 self.assertEqual(len(history), 7)179 history.write_file('my_history', raise_exc=True)180 history.clear()181 history.max_entries = 5182 history.read_file('my_history', raise_exc=True)183 self.assertEqual(history[0], 'barney')184 self.assertEqual(history[1], 'betty')185 self.assertEqual(history[2], 'pebbles')186 self.assertEqual(history[3], 'bammbamm')187 self.assertEqual(history[4], 'dino')188 self.assertEqual(len(history), 5)189 def test_write_file_stifled(self):190 history.append('fred')191 history.append('wilma')192 history.append('barney')193 history.append('betty')194 history.append('pebbles')195 history.append('bammbamm')196 history.append('dino')197 self.assertEqual(len(history), 7)198 history.max_entries = 5199 history.write_file('my_history', raise_exc=True)200 history.clear()201 history.max_entries = -1202 history.read_file('my_history', raise_exc=True)203 self.assertEqual(history[0], 'barney')204 self.assertEqual(history[1], 'betty')205 self.assertEqual(history[2], 'pebbles')206 self.assertEqual(history[3], 'bammbamm')207 self.assertEqual(history[4], 'dino')208 self.assertEqual(len(history), 5)209 def test_write_file_replaces_file(self):210 history.append('fred')211 history.append('wilma')212 history.append('pebbles')213 history.write_file('my_history', raise_exc=True)214 history.clear()215 history.append('barney')216 history.append('betty')217 history.write_file('my_history', raise_exc=True)218 history.clear()219 history.read_file('my_history', raise_exc=True)220 self.assertEqual(len(history), 2)221 self.assertEqual(history[0], 'barney')222 self.assertEqual(history[1], 'betty')223 def test_write_file_truncates_file(self):224 history.append('fred')225 history.append('wilma')226 history.append('pebbles')227 history.max_file = 2228 history.write_file('my_history', raise_exc=True)229 history.clear()230 history.read_file('my_history', raise_exc=True)231 self.assertEqual(len(history), 2)232 self.assertEqual(history[0], 'wilma')233 self.assertEqual(history[1], 'pebbles')234 def test_append_file(self):235 history.append('fred')236 history.append('wilma')237 history.append('pebbles')238 history.write_file('my_history', raise_exc=True)239 history.clear()240 history.append('barney')241 history.append('betty')242 history.append('bammbamm')243 history.append_file(2, 'my_history')244 history.clear()245 history.read_file('my_history', raise_exc=True)246 self.assertEqual(len(history), 5)247 self.assertEqual(history[0], 'fred')248 self.assertEqual(history[1], 'wilma')249 self.assertEqual(history[2], 'pebbles')250 self.assertEqual(history[3], 'betty')251 self.assertEqual(history[4], 'bammbamm')252 def test_append_file_truncates_file(self):253 history.append('fred')254 history.append('wilma')255 history.append('pebbles')256 history.write_file('my_history', raise_exc=True)257 history.clear()258 history.append('barney')259 history.append('betty')260 history.append('bammbamm')261 history.max_file = 3262 history.append_file(2, 'my_history')263 history.clear()264 history.read_file('my_history', raise_exc=True)265 self.assertEqual(len(history), 3)266 self.assertEqual(history[0], 'pebbles')267 self.assertEqual(history[1], 'betty')...
base.py
Source:base.py
1import importlib2import logging3from decimal import Decimal4from django.db import transaction as db_transaction5from django.utils import timezone6from payment_gateway.dto import Transaction as TransactionDTO7from payment_gateway.errors import InvalidMoneyAmount, InvoiceExpired, InvoiceAlreadyPaid, InvoiceInvalidStatus, \8 InsufficientMoneyAmount, PaymentError9from payment_gateway.models import Transaction, InvoiceStatus, TransactionStatus, TransactionStatusChange, \10 InvoiceStatusChange, Invoice11logger = logging.getLogger(__name__)12class AbstractPaymentHandler(object):13 def try_process_payment(self, invoice: Invoice, transaction: Transaction) -> Invoice:14 raise NotImplementedError15 def handle_payment_error(self, error: PaymentError, invoice: Invoice, transaction: Transaction, raise_exc: bool):16 raise NotImplementedError17 def validate_payment(self, invoice: Invoice, transaction: Transaction, raise_exc: bool) -> bool:18 raise NotImplementedError19 def validate_expiration(self, invoice: Invoice, raise_exc: bool = True) -> bool:20 raise NotImplementedError21 def validate_status_for_pay(self, invoice: Invoice, raise_exc: bool = True) -> bool:22 raise NotImplementedError23 def on_success(self, invoice: Invoice, *args, **kwargs) -> Invoice:24 raise NotImplementedError25 def on_fail(self, invoice: Invoice, *args, **kwargs) -> Invoice:26 raise NotImplementedError27class AbstractTransactionHandler(object):28 def create(self, transaction: TransactionDTO):29 raise NotImplementedError30 def set_expired(self, transaction: Transaction):31 raise NotImplementedError32 def set_invalid_money_amount(self, transaction: Transaction):33 raise NotImplementedError34 def set_success(self, transaction: Transaction):35 raise NotImplementedError36 def set_error(self, transaction: Transaction):37 raise NotImplementedError38 def set_declined(self, transaction: Transaction):39 raise NotImplementedError40 def update_transaction_status(self, transaction: Transaction, status: TransactionStatus):41 raise NotImplementedError42class AbstractCallbackProvider(object):43 def success(self, *args, **kwargs) -> Invoice:44 raise NotImplementedError45 def fail(self, *args, **kwargs) -> Invoice:46 raise NotImplementedError47class AbstractPaymentProvider(object):48 def __init__(self, payment_handler: AbstractPaymentHandler, transaction_handler: AbstractTransactionHandler):49 self.payment_handler = payment_handler50 self.transaction_handler = transaction_handler51 def pay(self, invoice_id: int, transaction_data: object) -> (Invoice, Transaction):52 raise NotImplementedError53class BasicCallbackProvider(AbstractCallbackProvider):54 def success(self, invoice, *args, **kwargs):55 logger.info('Calling success callback for invoice.', extra={'invoice_id': invoice.pk,56 'callback': invoice.success_callback})57 mod_name, func_name = invoice.success_callback.rsplit('.', 1)58 mod = importlib.import_module(mod_name)59 func = getattr(mod, func_name)60 func(invoice.id)61 logger.info('Executed success callback for invoice', extra={'invoice_id': invoice.pk,62 'callback': invoice.success_callback})63 return invoice64 def fail(self, invoice, *args, **kwargs):65 logger.info('Calling fail callback for invoice.', extra={'invoice_id': invoice.pk,66 'callback': invoice.fail_callback})67 if invoice.fail_callback is not None:68 mod_name, func_name = invoice.fail_callback.rsplit('.', 1)69 mod = importlib.import_module(mod_name)70 func = getattr(mod, func_name)71 func(invoice.id)72 logger.info('Executed fail callback for invoice.', extra={'invoice_id': invoice.pk,73 'callback': invoice.fail_callback})74 return invoice75class BasicTransactionHandler(AbstractTransactionHandler):76 def create(self, transaction: TransactionDTO):77 return Transaction.objects.create(78 invoice_id=transaction.invoice_id,79 money_amount=transaction.money_amount,80 type=transaction.type,81 status=TransactionStatus.PENDING82 )83 def set_expired(self, transaction: Transaction):84 return self.update_transaction_status(transaction, TransactionStatus.INVOICE_EXPIRED)85 def set_invalid_money_amount(self, transaction: Transaction):86 return self.update_transaction_status(transaction, TransactionStatus.INVALID_MONEY_AMOUNT)87 def set_success(self, transaction: Transaction):88 return self.update_transaction_status(transaction, TransactionStatus.SUCCESS)89 def set_error(self, transaction: Transaction):90 return self.update_transaction_status(transaction, TransactionStatus.ERROR)91 def set_declined(self, transaction: Transaction):92 return self.update_transaction_status(transaction, TransactionStatus.DECLINED)93 @db_transaction.atomic()94 def update_transaction_status(self, transaction: Transaction, status: TransactionStatus) -> Transaction:95 prev_status = transaction.status96 transaction.status = status97 transaction.save(update_fields=['status', 'modified_at'])98 TransactionStatusChange.objects.create(99 transaction=transaction,100 from_status=prev_status,101 to_status=transaction.status102 )103 return transaction104class BasicPaymentHandler(AbstractPaymentHandler):105 def __init__(self, callback_provider: AbstractCallbackProvider, transaction_handler: AbstractTransactionHandler):106 self.callback_provider = callback_provider107 self.transaction_handler = transaction_handler108 def try_process_payment(self, invoice: Invoice, transaction: Transaction) -> Invoice:109 assert db_transaction.get_connection().in_atomic_block110 self.validate_payment(invoice, transaction, raise_exc=True)111 invoice = self.make_invoice_success(invoice, transaction)112 return self.on_success(invoice)113 def handle_payment_error(self, error: PaymentError, invoice: Invoice, transaction: Transaction,114 raise_exc: bool = False):115 if isinstance(error, (InvalidMoneyAmount, InsufficientMoneyAmount)):116 self.transaction_handler.set_invalid_money_amount(transaction)117 elif isinstance(error, InvoiceExpired):118 self.make_invoice_expired(invoice, transaction)119 elif isinstance(error, InvoiceInvalidStatus):120 self.transaction_handler.set_declined(transaction)121 else:122 self.transaction_handler.set_error(transaction)123 if raise_exc:124 raise error125 return invoice, transaction126 def set_invoice_status(self, invoice: Invoice, status: InvoiceStatus) -> (Invoice, InvoiceStatus):127 old_status = invoice.status128 invoice.status = status129 return invoice, old_status130 @db_transaction.atomic()131 def make_invoice_success(self, invoice: Invoice, transaction: Transaction) -> Invoice:132 transaction = self.transaction_handler.set_success(transaction)133 invoice.success_transaction = transaction134 invoice.captured_total = transaction.money_amount135 invoice, old_status = self.set_invoice_status(invoice, InvoiceStatus.PAID)136 invoice.save(update_fields=['status', 'success_transaction', 'modified_at', 'captured_total'])137 self.write_invoice_history(invoice, new_status=invoice.status, old_status=old_status)138 return invoice139 @db_transaction.atomic()140 def make_invoice_expired(self, invoice: Invoice, transaction: Transaction) -> Invoice:141 self.transaction_handler.set_expired(transaction)142 if invoice.status != InvoiceStatus.EXPIRED:143 invoice, old_status = self.set_invoice_status(invoice, InvoiceStatus.EXPIRED)144 invoice.save(update_fields=['status', 'modified_at'])145 self.write_invoice_history(invoice, invoice.status, old_status)146 return invoice147 def write_invoice_history(self, invoice: Invoice, new_status: int, old_status: int) -> InvoiceStatusChange:148 return InvoiceStatusChange.objects.create(149 invoice=invoice,150 from_status=old_status,151 to_status=new_status152 )153 def validate_payment(self, invoice: Invoice, transaction: Transaction, raise_exc: bool = True) -> bool:154 valid = True155 valid = valid and self.validate_status_for_pay(invoice, raise_exc=raise_exc)156 valid = valid and self.validate_expiration(invoice, raise_exc=raise_exc)157 valid = valid and self.validate_money_amount(invoice, transaction.money_amount, raise_exc=raise_exc)158 return valid159 def validate_money_amount(self, invoice: Invoice, money_amount: Decimal, raise_exc: bool = True) -> bool:160 valid = invoice.total <= money_amount161 if not valid and raise_exc:162 if invoice.total > money_amount:163 raise InsufficientMoneyAmount()164 raise InvalidMoneyAmount()165 return valid166 def validate_expiration(self, invoice: Invoice, raise_exc: bool = True) -> bool:167 if invoice.expires_at is None:168 return True169 valid = invoice.expires_at > timezone.now()170 if not valid and raise_exc:171 raise InvoiceExpired()172 return valid173 def validate_status_for_pay(self, invoice: Invoice, raise_exc: bool = True) -> bool:174 valid = invoice.status == InvoiceStatus.PENDING175 if not valid and raise_exc:176 if invoice.status == InvoiceStatus.EXPIRED:177 raise InvoiceExpired()178 elif invoice.status == InvoiceStatus.PAID:179 raise InvoiceAlreadyPaid()180 else:181 raise InvoiceInvalidStatus()182 return valid183 def on_success(self, invoice: Invoice, *args, **kwargs) -> Invoice:184 return self.callback_provider.success(invoice, *args, **kwargs)185 def on_fail(self, invoice: Invoice, *args, **kwargs) -> Invoice:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!