Best Python code snippet using localstack_python
cells.py
Source:cells.py
...39 if cls.NUM == CellVersions.NUM or cls.NUM >= 128:40 return True41 else:42 return False43 def _serialize_payload(self):44 raise NotImplementedError('Must be implemented in a subclass')45 def serialize(self, proto_version):46 payload = self._serialize_payload()47 # Link protocol 4 increases circuit ID width to 4 bytes.48 if proto_version < 4:49 buffer = struct.pack('!HB', self.circuit_id, self.NUM)50 else:51 buffer = struct.pack('!IB', self.circuit_id, self.NUM)52 if self.is_var_len():53 buffer += struct.pack('!H', len(payload)) + payload54 else:55 buffer += struct.pack('!509s', payload)56 return buffer57 @staticmethod58 def deserialize(cell_type, circuit_id, payload, proto_version):59 kwargs = cell_type._deserialize_payload(payload, proto_version)60 return cell_type(**kwargs, circuit_id=circuit_id)61 def _args_str(self):62 return ''63 def __repr__(self):64 """Represent TorCell string."""65 args = self._args_str()66 circ_str = 'circuit_id = {:x}'.format(self.circuit_id) if self.circuit_id else ''67 return '{}({}{})'.format(type(self).__name__, args, ', ' + circ_str if args and circ_str else circ_str)68class TorCellEmpty(TorCell):69 NUM = -170 def __init__(self, data=None, circuit_id=0):71 super().__init__(circuit_id)72 self._data = data or b''73 if len(self._data) > 0:74 logger.warning('%s has some unnecessary data: %r', self.__class__.__qualname__, self._data)75 def _serialize_payload(self):76 return self._data77 @staticmethod78 def _deserialize_payload(payload, proto_version):79 return {'data': payload}80class CellVersions(TorCell):81 """The payload in a VERSIONS cell is a series of big-endian two-byte integers."""82 NUM = 783 def __init__(self, versions, circuit_id=0):84 super().__init__(circuit_id)85 self.versions = versions86 def _serialize_payload(self):87 return struct.pack('!' + ('H' * len(self.versions)), *self.versions)88 @staticmethod89 def _deserialize_payload(payload, proto_version):90 versions = []91 while payload:92 versions.append(struct.unpack('!H', payload[:2])[0])93 payload = payload[2:]94 return {'versions': versions}95 def _args_str(self):96 return 'versions = {!r}'.format(self.versions)97class CellNetInfo(TorCell):98 """99 CellNetInfo representation.100 The cell's payload is:101 - Timestamp [4 bytes]102 - Other OR's address [variable]103 - Number of addresses [1 byte]104 - This OR's addresses [variable]105 Address format:106 - Type (1 octet)107 - Length (1 octet)108 - Value (variable-width)109 "Length" is the length of the Value field.110 "Type" is one of:111 - 0x00 -- Hostname112 - 0x04 -- IPv4 address113 - 0x06 -- IPv6 address114 - 0xF0 -- Error, transient115 - 0xF1 -- Error, nontransient116 """117 NUM = 8118 def __init__(self, timestamp, other_or, this_or, circuit_id=0):119 super().__init__(circuit_id)120 self.timestamp = timestamp121 self.other_or = other_or122 self.this_or = this_or123 def _serialize_payload(self):124 payload_bytes = struct.pack('!IBB', self.timestamp, 4, 4) + socket.inet_aton(self.other_or)125 payload_bytes += struct.pack('!BBB', 1, 4, 4) + socket.inet_aton(self.this_or)126 return payload_bytes127 @staticmethod128 def _deserialize_payload(payload, proto_version):129 our_address_length = int(struct.unpack('!B', payload[5:][:1])[0])130 our_address = socket.inet_ntoa(payload[6:][:our_address_length])131 return {'timestamp': '', 'other_or': '', 'this_or': our_address}132 def _args_str(self):133 return 'timestamp = {!r}, other_or = {!r}, this_or = {!r}'.format(self.timestamp, self.other_or, self.this_or)134class CellDestroy(TorCell):135 """136 CellDestroy representation.137 The payload of a RELAY_TRUNCATED or DESTROY cell contains a single octet,138 describing why the circuit is being closed or truncated.139 """140 NUM = 4141 def __init__(self, reason, circuit_id):142 assert isinstance(reason, CircuitReason), 'reason must be CircuitReason enum'143 super().__init__(circuit_id)144 self.reason = reason145 @staticmethod146 def _deserialize_payload(payload, proto_version):147 return {'reason': CircuitReason(struct.unpack('!B', payload[:1])[0])}148 def _serialize_payload(self):149 return struct.pack('!B', self.reason)150 def _args_str(self):151 return 'reason = {}'.format(self.reason.name)152class RelayedTorCell(TorCell):153 NUM = -1154 MAX_PAYLOD_SIZE = 509 - 11155 def __init__(self, inner_cell, stream_id, circuit_id, padding=None, encrypted=None):156 super().__init__(circuit_id)157 self._set_inits(inner_cell, padding, stream_id, None)158 self._encrypted = encrypted159 self._checked = False160 def _set_inits(self, inner_cell, padding, stream_id, digest, **kwargs):161 self._inner_cell = inner_cell162 self._padding = padding or b''163 # if self._padding:164 # logger.warn('Has some padding!!!')165 self._stream_id = stream_id166 self._digest = digest167 @property168 def digest(self):169 return self._digest170 @property171 def stream_id(self):172 return self._stream_id173 @property174 def is_encrypted(self):175 return self._encrypted is not None176 def get_decrypted(self):177 assert self._checked178 return self._inner_cell179 def prepare(self, digesting_func):180 assert not self.digest, 'already prepared'181 payload = self._serialize_payload()182 self._digest = digesting_func(payload)183 assert len(self.digest) == 4184 def encrypt(self, encrypting_func):185 assert self._digest, 'must be prepared already'186 payload = self._serialize_payload()187 # verbose: logger.debug('relay full cell: %s', to_hex(payload))188 self._encrypted = encrypting_func(payload)189 def _serialize_payload(self):190 if self.is_encrypted:191 return self._encrypted192 else:193 relay_payload = self._inner_cell._serialize_payload()194 # logger.debug('relay_payload: %s', to_hex(relay_payload))195 payload_bytes = struct.pack('!B', self._inner_cell.NUM)196 payload_bytes += struct.pack('!H', 0) # 'recognized'197 payload_bytes += struct.pack('!H', self._stream_id)198 payload_bytes += struct.pack('!4s', self._digest if self._digest else b'\x00' * 4) # Digest placeholder199 if len(relay_payload) > RelayedTorCell.MAX_PAYLOD_SIZE:200 raise Exception(201 'relay payload length cannot be more than {} ({} got)'.format(202 RelayedTorCell.MAX_PAYLOD_SIZE, len(relay_payload)203 )204 )205 assert len(relay_payload) + len(self._padding) <= RelayedTorCell.MAX_PAYLOD_SIZE, 'wrong relay payload size'206 payload_bytes += struct.pack('!H', len(relay_payload))207 payload_bytes += struct.pack('!{}s'.format(RelayedTorCell.MAX_PAYLOD_SIZE), relay_payload + self._padding)208 return payload_bytes209 def get_encrypted(self):210 assert self._inner_cell is None211 assert self._encrypted212 return self._encrypted213 def set_encrypted(self, new_encrypted):214 assert new_encrypted215 self._encrypted = new_encrypted216 @staticmethod217 def parse_header(payload):218 try:219 header = struct.unpack('!BHH4sH498s', payload)220 fields = ['cell_num', 'is_recognized', 'stream_id', 'digest', 'relay_payload_len', 'relay_payload_raw']221 return dict(zip(fields, header))222 except struct.error:223 logger.error("Can't unpack: %r", to_hex(payload))224 raise225 @staticmethod226 def set_header_digest(payload, new_digest):227 assert len(new_digest) == 4, 'digest must be 4 bytes'228 return payload[:5] + new_digest + payload[5 + 4:]229 def set_decrypted(self, cell_num, stream_id, digest, relay_payload_len, relay_payload_raw, **kwargs):230 relay_payload = relay_payload_raw[:relay_payload_len]231 padding = relay_payload_raw[relay_payload_len:]232 if all([b == 0 for b in padding]):233 padding = b''234 try:235 cell_type = TorCommands.get_relay_by_num(cell_num)236 logger.debug('Deserialize %s relay cell', cell_type.__name__)237 inner_cell = TorCell.deserialize(cell_type, 0, relay_payload, 0)238 except BaseException:239 logger.error("Can't deserialize %i cell: %r", cell_num, to_hex(relay_payload))240 raise241 self._set_inits(inner_cell, padding, stream_id, digest)242 self._encrypted = None243 self._checked = True244 @staticmethod245 def _deserialize_payload(payload, proto_version):246 raise NotImplementedError("RelayedTorCell couldn't be deserialized")247 def _args_str(self):248 inner_str = '<encrypted>' if self.is_encrypted and not self._inner_cell else self._inner_cell249 stream_str = ', stream_id = {}'.format(self._stream_id or 0) if self._stream_id else ''250 digest_str = ', digest = {!r}'.format(self._digest) if self._digest else ''251 return 'inner_cell = {!r}{}{}'.format(inner_str, stream_str, digest_str)252class CellRelayEarly(RelayedTorCell):253 NUM = 9254 def __init__(self, inner_cell, stream_id=0, circuit_id=0, padding=None, encrypted=None):255 super().__init__(inner_cell, stream_id, circuit_id, padding=padding, encrypted=encrypted)256class CellPadding(TorCellEmpty):257 NUM = 0258class CellRelay(RelayedTorCell):259 NUM = 3260 def __init__(self, inner_cell, stream_id=0, circuit_id=0, padding=None, encrypted=None):261 super().__init__(inner_cell, stream_id, circuit_id, padding=padding, encrypted=encrypted)262 @staticmethod263 def _deserialize_payload(payload, proto_version):264 return {'inner_cell': None, 'encrypted': payload}265class CellRelayExtend2(TorCell):266 """267 CellRelayExtend2 representation.268 To extend an existing circuit, the client sends an EXTEND2269 relay cell to the last node in the circuit.270 An EXTEND2 cell's relay payload contains:271 NSPEC (Number of link specifiers) [1 byte]272 NSPEC times:273 LSTYPE (Link specifier type) [1 byte]274 LSLEN (Link specifier length) [1 byte]275 LSPEC (Link specifier) [LSLEN bytes]276 HTYPE (Client Handshake Type) [2 bytes]277 HLEN (Client Handshake Data Len) [2 bytes]278 HDATA (Client Handshake Data) [HLEN bytes]279 """280 NUM = 14281 def __init__(self, ip, port, fingerprint, skin):282 super().__init__()283 self.nspec = 2 # 2x NSPEC284 self.link_type = 0 # link_specifier_type::ipv4285 self.finger_type = 2 # link_specifier_type::legacy_id286 self.ip = ip287 self.port = port288 self.fingerprint = fingerprint289 self.skin = skin290 def _serialize_payload(self):291 payload_bytes = struct.pack('!B', self.nspec)292 ip_port_len = 6293 payload_bytes += struct.pack('!BB4sH', self.link_type, ip_port_len, socket.inet_aton(self.ip), self.port)294 assert len(self.fingerprint) == 20295 payload_bytes += struct.pack('!BB20s', self.finger_type, len(self.fingerprint), self.fingerprint)296 assert len(self.skin) == 84297 payload_bytes += struct.pack('!HH', 2, len(self.skin)) + self.skin298 return payload_bytes299 @staticmethod300 def _deserialize_payload(payload, proto_version):301 raise NotImplementedError('CellRelayExtend2 deserialization not implemented')302 def _args_str(self):303 return 'ip = {}, port = {}, fingerprint = {}'.format(self.ip, self.port, fp_to_str(self.fingerprint))304class CellCreate(TorCell):305 NUM = 1306 def __init__(self, onion_skin, circuit_id):307 super().__init__(circuit_id)308 self.onion_skin = onion_skin309 def _serialize_payload(self):310 return self.onion_skin311 def _args_str(self):312 return "onion_skin = b'...'"313class CellCreateFast(CellCreate):314 NUM = 5315class CellCreate2(TorCell):316 NUM = 10317 def __init__(self, handshake_type, onion_skin, circuit_id):318 super().__init__(circuit_id)319 self.handshake_type = handshake_type320 self.onion_skin = onion_skin321 def _serialize_payload(self):322 return struct.pack('!HH', self.handshake_type, len(self.onion_skin)) + self.onion_skin323 def _args_str(self):324 return "type = {!r}, onion_skin = b'...'".format(self.handshake_type)325class CellCreated2(TorCell):326 """327 CellCreated2 representation.328 A CREATED2 cell contains:329 DATA_LEN (Server Handshake Data Len) [2 bytes]330 DATA (Server Handshake Data) [DATA_LEN bytes]331 """332 NUM = 11333 def __init__(self, handshake_data, circuit_id):334 super().__init__(circuit_id)335 self.handshake_data = handshake_data336 def _serialize_payload(self):337 return struct.pack('!H', len(self.handshake_data)) + self.handshake_data338 @staticmethod339 def _deserialize_payload(payload, proto_version):340 # tor ref: created_cell_parse341 length = struct.unpack('!H', payload[:2])[0]342 handshake_data = payload[2:length + 2]343 return {'handshake_data': handshake_data}344 def _args_str(self):345 return 'handshake_data = ...'346class CellCreatedFast(TorCell):347 NUM = 6348 def __init__(self, handshake_data, circuit_id):349 super().__init__(circuit_id)350 assert len(handshake_data) == TOR_DIGEST_LEN * 2351 self.handshake_data = handshake_data352 def _serialize_payload(self):353 return self.handshake_data354 @staticmethod355 def _deserialize_payload(payload, proto_version):356 # tor ref: created_cell_parse357 return {'handshake_data': payload[:TOR_DIGEST_LEN * 2]}358 def _args_str(self):359 return 'handshake_data = ...'360class CellRelayBegin(TorCell):361 """362 CellRelayBegin representation.363 tor-spec.txt364 6.2.365 ADDRPORT [nul-terminated string]366 FLAGS[4 bytes]367 ADDRPORT is made of ADDRESS | ':' | PORT | [00]368 """369 NUM = 1370 def __init__(self, address, port):371 super().__init__()372 self.address = address373 self.flags = None374 self.port = port375 def _serialize_payload(self):376 addr_port = '{}:{}'.format(self.address, self.port).encode()377 return addr_port + struct.pack('!BI', 0, 0)378 @staticmethod379 def _deserialize_payload(payload, proto_version):380 raise NotImplementedError('CellRelayBegin deserialization not implemented')381 def _args_str(self):382 return 'address = {!r}, port = {!r}, flags = {!r}'.format(self.address, self.port, self.flags)383class CellRelayBeginDir(TorCellEmpty):384 NUM = 13385class CellRelayData(TorCell):386 NUM = 2387 def __init__(self, data, circuit_id):388 super().__init__(circuit_id)389 self.data = data390 def _serialize_payload(self):391 return self.data392 @staticmethod393 def _deserialize_payload(payload, proto_version):394 return {'data': payload}395 def _args_str(self):396 return 'data = ... ({} bytes)'.format(len(self.data))397class CellRelayEnd(TorCell):398 """399 CellRelayEnd representation.400 The payload of a RELAY_END cell begins with a single 'reason' byte to401 describe why the stream is closing. For some reasons, it contains402 additional data (depending on the reason.)403 (With REASON_EXITPOLICY, the 4-byte IPv4 address or 16-byte IPv6 address404 forms the optional data, along with a 4-byte TTL; no other reason405 currently has extra data.)406 """407 NUM = 3408 def __init__(self, reason, circuit_id, address=None, ttl=None):409 assert isinstance(reason, StreamReason), 'reason must be StreamReason enum'410 super().__init__(circuit_id)411 self.reason = reason412 self.address = address413 self.ttl = ttl414 def _serialize_payload(self):415 if self.reason == StreamReason.EXIT_POLICY:416 ip_int = struct.unpack('!I', socket.inet_aton(self.address))[0]417 return struct.pack('!BII', self.reason, ip_int, self.ttl)418 else:419 return struct.pack('!B', self.reason)420 @staticmethod421 def _deserialize_payload(payload, proto_version):422 payload_reason = payload[:1]423 reason = StreamReason(struct.unpack('!B', payload_reason)[0])424 ttl = None425 address = None426 if reason == StreamReason.EXIT_POLICY:427 # (With REASON_EXITPOLICY, the 4-byte IPv4 address or 16-byte IPv6 address428 # forms the optional data, along with a 4-byte TTL; no other reason429 # currently has extra data.)430 ip_int, ttl = struct.unpack('!II', payload[1:])431 address = socket.inet_ntoa(struct.pack('!I', ip_int))432 return {'reason': reason, 'address': address, 'ttl': ttl}433 def _args_str(self):434 return 'reason = {!r}'.format(self.reason.name)435class CellRelayConnected(TorCell):436 """437 CellRelayConnected representation.438 Otherwise, the exit node replies with a RELAY_CONNECTED cell, whose439 payload is in one of the following formats:440 The IPv4 address to which the connection was made [4 octets]441 A number of seconds (TTL) for which the address may be cached [4 octets]442 or443 Four zero-valued octets [4 octets]444 An address type (6) [1 octet]445 The IPv6 address to which the connection was made [16 octets]446 A number of seconds (TTL) for which the address may be cached [4 octets]447 """448 NUM = 4449 def __init__(self, address, ttl, circuit_id):450 super().__init__(circuit_id)451 self.address = address452 self.ttl = ttl453 def _serialize_payload(self):454 if self.address and self.ttl:455 ip_int = struct.unpack('!I', socket.inet_aton(self.address))[0]456 return struct.pack('!II', ip_int, self.ttl)457 else:458 return b''459 @staticmethod460 def _deserialize_payload(payload, proto_version):461 if payload:462 # logger.debug(to_hex(payload))463 ip_int, ttl = struct.unpack('!II', payload)464 address = socket.inet_ntoa(struct.pack('!I', ip_int))465 return {'address': address, 'ttl': ttl}466 else:467 # for dir begin?468 return {'address': '', 'ttl': 0}469 def _args_str(self):470 if self.address and self.ttl:471 return 'address = {}, ttl = {}'.format(self.address, self.ttl)472 else:473 return ''474class CellRelaySendMe(TorCell):475 NUM = 5476 def __init__(self, version=None, digest=None, circuit_id=0):477 super().__init__(circuit_id)478 self._version = version479 self._digest = digest480 def _serialize_payload(self):481 if self._version and self._digest:482 return struct.pack('!BH', self._version, len(self._digest)) + self._digest483 else:484 return b''485 @staticmethod486 def _deserialize_payload(payload, proto_version):487 version = None488 digest = None489 if len(payload) > 0:490 version, data_len = struct.unpack('!BH', payload[:3])491 if version != 0 and version != 1:492 logger.error('wrong sendme call version')493 digest = payload[3:3 + data_len]494 if len(payload[3 + data_len:]) > 0:495 logger.error('has some extra data: %r', payload[3 + data_len:])496 return {'version': version, 'digest': digest}497class CellRelayTruncated(CellDestroy):498 NUM = 9499class CellRelayExtended2(CellCreated2):500 """The payload of an EXTENDED2 cell is the same as the payload of a CREATED2 cell."""501 NUM = 15502class CellRelayEstablishRendezvous(TorCell):503 NUM = 33504 def __init__(self, rendezvous_cookie, circuit_id):505 super().__init__(circuit_id)506 assert len(rendezvous_cookie) == 20507 self.rendezvous_cookie = rendezvous_cookie508 def _serialize_payload(self):509 return self.rendezvous_cookie510 @staticmethod511 def _deserialize_payload(payload, proto_version):512 raise NotImplementedError('CellRelayEstablishRendezvous deserialization not implemented')513class CellRelayIntroduce1(TorCell):514 NUM = 34515 def __init__(516 self,517 introduction_point_router: Router,518 introduction_point_key,519 rendezvous,520 rendezvous_cookie,521 auth_type,522 descriptor_cookie,523 circuit_id,524 ):525 super().__init__(circuit_id)526 self.introduction_point = introduction_point_router527 self.introduction_point_key = introduction_point_key528 self.handshake_encrypted = self._create_handshake(rendezvous, rendezvous_cookie, auth_type, descriptor_cookie)529 def _create_handshake(self, introduce, rendezvous_cookie, auth_type, descriptor_cookie):530 #531 # payload of the RELAY_COMMAND_INTRODUCE1532 # command:533 #534 # VER Version byte: set to 2. [1 octet]535 # IP Rendezvous point's address [4 octets]536 # PORT Rendezvous point's OR port [2 octets]537 # ID Rendezvous point identity ID [20 octets]538 # KLEN Length of onion key [2 octets]539 # KEY Rendezvous point onion key [KLEN octets]540 # RC Rendezvous cookie [20 octets]541 # g^x Diffie-Hellman data, part 1 [128 octets]542 #543 # VER Version byte: set to 3. [1 octet]544 # AUTHT The auth type that is used [1 octet]545 # If AUTHT != [00]:546 # AUTHL Length of auth data [2 octets]547 # AUTHD Auth data [variable]548 # TS A timestamp [4 octets]549 # IP Rendezvous point's address [4 octets]550 # PORT Rendezvous point's OR port [2 octets]551 # ID Rendezvous point identity ID [20 octets]552 # KLEN Length of onion key [2 octets]553 # KEY Rendezvous point onion key [KLEN octets]554 # RC Rendezvous cookie [20 octets]555 # g^x Diffie-Hellman data, part 1 [128 octets]556 handshake = struct.pack('!BB', 3, auth_type)557 if auth_type != AuthType.No:558 assert len(descriptor_cookie) == 16559 handshake += struct.pack('!H16s', len(descriptor_cookie), descriptor_cookie)560 handshake += struct.pack('!I', 0) # timestamp561 handshake += struct.pack('!4sH', socket.inet_aton(introduce.ip), introduce.or_port)562 assert len(introduce.fingerprint) == 20563 handshake += struct.pack('!20s', introduce.fingerprint)564 handshake += struct.pack('!H', len(introduce.descriptor.onion_key)) + introduce.descriptor.onion_key565 handshake += struct.pack('!20s', rendezvous_cookie)566 assert len(self.introduction_point_key) == 128567 handshake += self.introduction_point_key568 return hybrid_encrypt(handshake, self.introduction_point.service_key)569 def _serialize_payload(self):570 # PK_ID Identifier for Bob's PK [20 octets]571 return struct.pack('!20s', sha1(self.introduction_point.service_key)) + self.handshake_encrypted572 @staticmethod573 def _deserialize_payload(payload, proto_version):574 raise NotImplementedError('CellRelayEstablishRendezvous deserialization not implemented')575class CellRelayRendezvous2(TorCell):576 NUM = 37577 def __init__(self, handshake_data, circuit_id):578 super().__init__(circuit_id)579 self.handshake_data = handshake_data580 def _serialize_payload(self):581 return self.handshake_data582 @staticmethod583 def _deserialize_payload(payload, proto_version):584 return {'handshake_data': payload}585 def _args_str(self):586 return 'handshake_data = ...'587class CellRelayRendezvousEstablished(TorCellEmpty):588 NUM = 39589class CellRelayIntroduceAck(TorCellEmpty):590 NUM = 40591class CellCerts(TorCell):592 NUM = 129593 def __init__(self, certs, circuit_id=0):594 super().__init__(circuit_id)595 self.certs = certs596 def _serialize_payload(self):597 raise NotImplementedError('CellCerts serialization not implemented')598 @staticmethod599 def _deserialize_payload(payload, proto_version):600 # TODO: implement parse601 return {'certs': payload}602class CellAuthChallenge(TorCell):603 NUM = 130604 def __init__(self, auth, circuit_id=0):605 super().__init__(circuit_id)606 self.auth = auth607 def _serialize_payload(self):608 raise NotImplementedError('CellAuthChallenge serialization not implemented')609 @staticmethod610 def _deserialize_payload(payload, proto_version):611 # TODO: implement parse612 return {'auth': payload}613class TorCommands:614 """615 Enum class which contains all available command types.616 tor-spec.txt 3. "Cell Packet format"617 """618 _map = {619 # fmt: off620 # Fixed-length command values.621 CellPadding.NUM: CellPadding, # 0...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!