Best Python code snippet using tavern
decoder.py
Source:decoder.py
1import re2import struct3from collections.abc import Mapping4from datetime import datetime, timedelta, timezone5from io import BytesIO6from .types import (7 CBORDecodeValueError, CBORDecodeEOF, CBORTag, undefined, break_marker,8 CBORSimpleValue, FrozenDict)9timestamp_re = re.compile(r'^(\d{4})-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)'10 r'(?:\.(\d{1,6})\d*)?(?:Z|([+-]\d\d):(\d\d))$')11class CBORDecoder:12 """13 The CBORDecoder class implements a fully featured `CBOR`_ decoder with14 several extensions for handling shared references, big integers, rational15 numbers and so on. Typically the class is not used directly, but the16 :func:`load` and :func:`loads` functions are called to indirectly construct17 and use the class.18 When the class is constructed manually, the main entry points are19 :meth:`decode` and :meth:`decode_from_bytes`.20 :param tag_hook:21 callable that takes 2 arguments: the decoder instance, and the22 :class:`CBORTag` to be decoded. This callback is invoked for any tags23 for which there is no built-in decoder. The return value is substituted24 for the :class:`CBORTag` object in the deserialized output25 :param object_hook:26 callable that takes 2 arguments: the decoder instance, and a27 dictionary. This callback is invoked for each deserialized28 :class:`dict` object. The return value is substituted for the dict in29 the deserialized output.30 .. _CBOR: https://cbor.io/31 """32 __slots__ = (33 '_tag_hook', '_object_hook', '_share_index', '_shareables', '_fp_read',34 '_immutable', '_str_errors')35 def __init__(self, fp, tag_hook=None, object_hook=None,36 str_errors='strict'):37 self.fp = fp38 self.tag_hook = tag_hook39 self.object_hook = object_hook40 self.str_errors = str_errors41 self._share_index = None42 self._shareables = []43 self._immutable = False44 @property45 def immutable(self):46 """47 Used by decoders to check if the calling context requires an immutable48 type. Object_hook or tag_hook should raise an exception if this flag49 is set unless the result can be safely used as a dict key.50 """51 return self._immutable52 @property53 def fp(self):54 return self._fp_read.__self__55 @fp.setter56 def fp(self, value):57 try:58 if not callable(value.read):59 raise ValueError('fp.read is not callable')60 except AttributeError:61 raise ValueError('fp object has no read method')62 else:63 self._fp_read = value.read64 @property65 def tag_hook(self):66 return self._tag_hook67 @tag_hook.setter68 def tag_hook(self, value):69 if value is None or callable(value):70 self._tag_hook = value71 else:72 raise ValueError('tag_hook must be None or a callable')73 @property74 def object_hook(self):75 return self._object_hook76 @object_hook.setter77 def object_hook(self, value):78 if value is None or callable(value):79 self._object_hook = value80 else:81 raise ValueError('object_hook must be None or a callable')82 @property83 def str_errors(self):84 return self._str_errors85 @str_errors.setter86 def str_errors(self, value):87 if value in ('strict', 'error', 'replace'):88 self._str_errors = value89 else:90 raise ValueError(91 "invalid str_errors value {!r} (must be one of 'strict', "92 "'error', or 'replace')".format(value))93 def set_shareable(self, value):94 """95 Set the shareable value for the last encountered shared value marker,96 if any. If the current shared index is ``None``, nothing is done.97 :param value: the shared value98 :returns: the shared value to permit chaining99 """100 if self._share_index is not None:101 self._shareables[self._share_index] = value102 return value103 def read(self, amount):104 """105 Read bytes from the data stream.106 :param int amount: the number of bytes to read107 """108 data = self._fp_read(amount)109 if len(data) < amount:110 raise CBORDecodeEOF(111 'premature end of stream (expected to read {} bytes, got {} '112 'instead)'.format(amount, len(data)))113 return data114 def _decode(self, immutable=False, unshared=False):115 if immutable:116 old_immutable = self._immutable117 self._immutable = True118 if unshared:119 old_index = self._share_index120 self._share_index = None121 try:122 initial_byte = self.read(1)[0]123 major_type = initial_byte >> 5124 subtype = initial_byte & 31125 decoder = major_decoders[major_type]126 return decoder(self, subtype)127 finally:128 if immutable:129 self._immutable = old_immutable130 if unshared:131 self._share_index = old_index132 def decode(self):133 """134 Decode the next value from the stream.135 :raises CBORDecodeError: if there is any problem decoding the stream136 """137 return self._decode()138 def decode_from_bytes(self, buf):139 """140 Wrap the given bytestring as a file and call :meth:`decode` with it as141 the argument.142 This method was intended to be used from the ``tag_hook`` hook when an143 object needs to be decoded separately from the rest but while still144 taking advantage of the shared value registry.145 """146 with BytesIO(buf) as fp:147 old_fp = self.fp148 self.fp = fp149 retval = self._decode()150 self.fp = old_fp151 return retval152 def _decode_length(self, subtype, allow_indefinite=False):153 if subtype < 24:154 return subtype155 elif subtype == 24:156 return self.read(1)[0]157 elif subtype == 25:158 return struct.unpack('>H', self.read(2))[0]159 elif subtype == 26:160 return struct.unpack('>L', self.read(4))[0]161 elif subtype == 27:162 return struct.unpack('>Q', self.read(8))[0]163 elif subtype == 31 and allow_indefinite:164 return None165 else:166 raise CBORDecodeValueError(167 'unknown unsigned integer subtype 0x%x' % subtype)168 def decode_uint(self, subtype):169 # Major tag 0170 return self.set_shareable(self._decode_length(subtype))171 def decode_negint(self, subtype):172 # Major tag 1173 return self.set_shareable(-self._decode_length(subtype) - 1)174 def decode_bytestring(self, subtype):175 # Major tag 2176 length = self._decode_length(subtype, allow_indefinite=True)177 if length is None:178 # Indefinite length179 buf = []180 while True:181 initial_byte = self.read(1)[0]182 if initial_byte == 0xff:183 result = b''.join(buf)184 break185 elif initial_byte >> 5 == 2:186 length = self._decode_length(initial_byte & 0x1f)187 value = self.read(length)188 buf.append(value)189 else:190 raise CBORDecodeValueError(191 "non-bytestring found in indefinite length bytestring")192 else:193 result = self.read(length)194 return self.set_shareable(result)195 def decode_string(self, subtype):196 # Major tag 3197 length = self._decode_length(subtype, allow_indefinite=True)198 if length is None:199 # Indefinite length200 # NOTE: It may seem redundant to repeat this code to handle UTF-8201 # strings but there is a reason to do this separately to202 # byte-strings. Specifically, the CBOR spec states (in sec. 2.2):203 #204 # Text strings with indefinite lengths act the same as byte205 # strings with indefinite lengths, except that all their chunks206 # MUST be definite-length text strings. Note that this implies207 # that the bytes of a single UTF-8 character cannot be spread208 # between chunks: a new chunk can only be started at a209 # character boundary.210 #211 # This precludes using the indefinite bytestring decoder above as212 # that would happily ignore UTF-8 characters split across chunks.213 buf = []214 while True:215 initial_byte = self.read(1)[0]216 if initial_byte == 0xff:217 result = ''.join(buf)218 break219 elif initial_byte >> 5 == 3:220 length = self._decode_length(initial_byte & 0x1f)221 value = self.read(length).decode('utf-8', self._str_errors)222 buf.append(value)223 else:224 raise CBORDecodeValueError(225 "non-string found in indefinite length string")226 else:227 result = self.read(length).decode('utf-8', self._str_errors)228 return self.set_shareable(result)229 def decode_array(self, subtype):230 # Major tag 4231 length = self._decode_length(subtype, allow_indefinite=True)232 if length is None:233 # Indefinite length234 items = []235 if not self._immutable:236 self.set_shareable(items)237 while True:238 value = self._decode()239 if value is break_marker:240 break241 else:242 items.append(value)243 else:244 items = []245 if not self._immutable:246 self.set_shareable(items)247 for index in range(length):248 items.append(self._decode())249 if self._immutable:250 items = tuple(items)251 self.set_shareable(items)252 return items253 def decode_map(self, subtype):254 # Major tag 5255 length = self._decode_length(subtype, allow_indefinite=True)256 if length is None:257 # Indefinite length258 dictionary = {}259 self.set_shareable(dictionary)260 while True:261 key = self._decode(immutable=True, unshared=True)262 if key is break_marker:263 break264 else:265 dictionary[key] = self._decode(unshared=True)266 else:267 dictionary = {}268 self.set_shareable(dictionary)269 for _ in range(length):270 key = self._decode(immutable=True, unshared=True)271 dictionary[key] = self._decode(unshared=True)272 if self._object_hook:273 dictionary = self._object_hook(self, dictionary)274 self.set_shareable(dictionary)275 elif self._immutable:276 dictionary = FrozenDict(dictionary)277 self.set_shareable(dictionary)278 return dictionary279 def decode_semantic(self, subtype):280 # Major tag 6281 tagnum = self._decode_length(subtype)282 semantic_decoder = semantic_decoders.get(tagnum)283 if semantic_decoder:284 return semantic_decoder(self)285 else:286 tag = CBORTag(tagnum, None)287 self.set_shareable(tag)288 tag.value = self._decode(unshared=True)289 if self._tag_hook:290 tag = self._tag_hook(self, tag)291 return self.set_shareable(tag)292 def decode_special(self, subtype):293 # Simple value294 if subtype < 20:295 # XXX Set shareable?296 return CBORSimpleValue(subtype)297 # Major tag 7298 return special_decoders[subtype](self)299 #300 # Semantic decoders (major tag 6)301 #302 def decode_datetime_string(self):303 # Semantic tag 0304 value = self._decode()305 match = timestamp_re.match(value)306 if match:307 (308 year,309 month,310 day,311 hour,312 minute,313 second,314 secfrac,315 offset_h,316 offset_m,317 ) = match.groups()318 if secfrac is None:319 microsecond = 0320 else:321 microsecond = int('{:<06}'.format(secfrac))322 if offset_h:323 tz = timezone(timedelta(hours=int(offset_h), minutes=int(offset_m)))324 else:325 tz = timezone.utc326 return self.set_shareable(datetime(327 int(year), int(month), int(day),328 int(hour), int(minute), int(second), microsecond, tz))329 else:330 raise CBORDecodeValueError(331 'invalid datetime string: {!r}'.format(value))332 def decode_epoch_datetime(self):333 # Semantic tag 1334 value = self._decode()335 return self.set_shareable(datetime.fromtimestamp(value, timezone.utc))336 def decode_positive_bignum(self):337 # Semantic tag 2338 from binascii import hexlify339 value = self._decode()340 return self.set_shareable(int(hexlify(value), 16))341 def decode_negative_bignum(self):342 # Semantic tag 3343 return self.set_shareable(-self.decode_positive_bignum() - 1)344 def decode_fraction(self):345 # Semantic tag 4346 from decimal import Decimal347 exp, sig = self._decode()348 return self.set_shareable(Decimal(sig) * (10 ** Decimal(exp)))349 def decode_bigfloat(self):350 # Semantic tag 5351 from decimal import Decimal352 exp, sig = self._decode()353 return self.set_shareable(Decimal(sig) * (2 ** Decimal(exp)))354 def decode_shareable(self):355 # Semantic tag 28356 old_index = self._share_index357 self._share_index = len(self._shareables)358 self._shareables.append(None)359 try:360 return self._decode()361 finally:362 self._share_index = old_index363 def decode_sharedref(self):364 # Semantic tag 29365 value = self._decode(unshared=True)366 try:367 shared = self._shareables[value]368 except IndexError:369 raise CBORDecodeValueError('shared reference %d not found' % value)370 if shared is None:371 raise CBORDecodeValueError(372 'shared value %d has not been initialized' % value)373 else:374 return shared375 def decode_rational(self):376 # Semantic tag 30377 from fractions import Fraction378 return self.set_shareable(Fraction(*self._decode()))379 def decode_regexp(self):380 # Semantic tag 35381 return self.set_shareable(re.compile(self._decode()))382 def decode_mime(self):383 # Semantic tag 36384 from email.parser import Parser385 return self.set_shareable(Parser().parsestr(self._decode()))386 def decode_uuid(self):387 # Semantic tag 37388 from uuid import UUID389 return self.set_shareable(UUID(bytes=self._decode()))390 def decode_set(self):391 # Semantic tag 258392 if self._immutable:393 return self.set_shareable(frozenset(self._decode(immutable=True)))394 else:395 return self.set_shareable(set(self._decode(immutable=True)))396 def decode_ipaddress(self):397 # Semantic tag 260398 from ipaddress import ip_address399 buf = self.decode()400 if not isinstance(buf, bytes) or len(buf) not in (4, 6, 16):401 raise CBORDecodeValueError("invalid ipaddress value %r" % buf)402 elif len(buf) in (4, 16):403 return self.set_shareable(ip_address(buf))404 elif len(buf) == 6:405 # MAC address406 return self.set_shareable(CBORTag(260, buf))407 def decode_ipnetwork(self):408 # Semantic tag 261409 from ipaddress import ip_network410 net_map = self.decode()411 if isinstance(net_map, Mapping) and len(net_map) == 1:412 for net in net_map.items():413 try:414 return self.set_shareable(ip_network(net, strict=False))415 except (TypeError, ValueError):416 break417 raise CBORDecodeValueError("invalid ipnetwork value %r" % net_map)418 def decode_self_describe_cbor(self):419 # Semantic tag 55799420 return self._decode()421 #422 # Special decoders (major tag 7)423 #424 def decode_simple_value(self):425 # XXX Set shareable?426 return CBORSimpleValue(self.read(1)[0])427 def decode_float16(self):428 payload = self.read(2)429 value = struct.unpack('>e', payload)[0]430 return self.set_shareable(value)431 def decode_float32(self):432 return self.set_shareable(struct.unpack('>f', self.read(4))[0])433 def decode_float64(self):434 return self.set_shareable(struct.unpack('>d', self.read(8))[0])435major_decoders = {436 0: CBORDecoder.decode_uint,437 1: CBORDecoder.decode_negint,438 2: CBORDecoder.decode_bytestring,439 3: CBORDecoder.decode_string,440 4: CBORDecoder.decode_array,441 5: CBORDecoder.decode_map,442 6: CBORDecoder.decode_semantic,443 7: CBORDecoder.decode_special,444}445special_decoders = {446 20: lambda self: False,447 21: lambda self: True,448 22: lambda self: None,449 23: lambda self: undefined,450 24: CBORDecoder.decode_simple_value,451 25: CBORDecoder.decode_float16,452 26: CBORDecoder.decode_float32,453 27: CBORDecoder.decode_float64,454 31: lambda self: break_marker,455}456semantic_decoders = {457 0: CBORDecoder.decode_datetime_string,458 1: CBORDecoder.decode_epoch_datetime,459 2: CBORDecoder.decode_positive_bignum,460 3: CBORDecoder.decode_negative_bignum,461 4: CBORDecoder.decode_fraction,462 5: CBORDecoder.decode_bigfloat,463 28: CBORDecoder.decode_shareable,464 29: CBORDecoder.decode_sharedref,465 30: CBORDecoder.decode_rational,466 35: CBORDecoder.decode_regexp,467 36: CBORDecoder.decode_mime,468 37: CBORDecoder.decode_uuid,469 258: CBORDecoder.decode_set,470 260: CBORDecoder.decode_ipaddress,471 261: CBORDecoder.decode_ipnetwork,472 55799: CBORDecoder.decode_self_describe_cbor,473}474def loads(s, **kwargs):475 """476 Deserialize an object from a bytestring.477 :param bytes s:478 the bytestring to deserialize479 :param kwargs:480 keyword arguments passed to :class:`CBORDecoder`481 :return:482 the deserialized object483 """484 with BytesIO(s) as fp:485 return CBORDecoder(fp, **kwargs).decode()486def load(fp, **kwargs):487 """488 Deserialize an object from an open file.489 :param fp:490 the input file (any file-like object)491 :param kwargs:492 keyword arguments passed to :class:`CBORDecoder`493 :return:494 the deserialized object495 """...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!