Best Python code snippet using webdriver_manager
intro_objects.py
Source:intro_objects.py
...66 return 167 if val1 < val2:68 return -169 return 070def get_binary_name(name, cls, wide=True, trim=0):71 """ Returns the binary hash of the given name. This function also checks72 the predefined names in the config file based on the `cls` argument.73 If `wide` is True, then it will convert the name to the wide (utf-16)74 format.75 If trim is != 0, then it will trim the name to `trim` chars. """76 if not name:77 return int(-1)78 if cls is KernelException:79 predefined = excfg.get("Predefined", "kernel-mode")80 elif cls is UserException:81 predefined = excfg.get("Predefined", "user-mode")82 elif cls is KernelUserException:83 predefined = excfg.get("Predefined", "kenrel-user-mode")84 elif cls is Signature:85 predefined = excfg.get("Predefined", "signature")86 else:87 raise ValueError("Invalid class %s" % cls)88 if name in predefined:89 return predefined[name]90 if trim:91 return crc32.crc32(name[:trim], wide=wide)92 return crc32.crc32(name, wide=wide)93def get_binary_flags(flags, cls):94 """ Returns the binary representation of the given flags.95 If the `cls` is a UserException then it will use the values from 'user-mode'96 config flags and the 'common' ones.97 If the `cls` is a KernelException then it will use the values from98 'kernel-mode' config flags and the 'common' ones.99 If the `cls` is a UserException then it will use the values from100 'signatures' config flags. The 'common' ones are ignored. """101 cfg_cflags = excfg.get("Flags", "common")102 if cls is KernelException:103 cfg_flags = excfg.get("Flags", "kernel-mode")104 elif cls is UserException or cls is UserGlobException:105 cfg_flags = excfg.get("Flags", "user-mode")106 elif cls is KernelUserException:107 cfg_flags = excfg.get("Flags", "user-mode")108 cfg_flags.update(excfg.get("Flags", "kernel-mode"))109 cfg_flags.update(excfg.get("Flags", "kernel-user-mode"))110 elif cls is Signature:111 cfg_flags = excfg.get("Flags", "signatures")112 cfg_cflags = []113 else:114 raise ValueError("Invalid class %s" % cls)115 bin_flags = 0116 for flag in flags.split(" "):117 if flag in cfg_flags:118 bin_flags = bin_flags | cfg_flags[flag]119 elif flag in cfg_cflags:120 bin_flags = bin_flags | cfg_cflags[flag]121 else:122 raise ValueError("Invalid flag " + flag)123 return bin_flags124def get_sig_id(sig_id, sig_type, create_new=True):125 """ If we don't have this signature in the hash_map, then add it this way;126 if we request the id of a signature at different times and from different127 functions, we will get either a new one, or the old one. """128 if sig_type == 0 and sig_id not in SIG_IDS:129 raise ValueError("invalid signature type")130 if sig_id not in SIG_IDS:131 if not create_new:132 raise ValueError("Signature %s not present!" % sig_id)133 new_id = len(SIG_IDS) + 1134 new_id += (sig_type << 22)135 SIG_IDS[sig_id] = new_id136 elif create_new:137 raise ValueError("Duplicated signature found", sig_id)138 return SIG_IDS[sig_id]139def get_binary_signatures(signatures):140 """ Returns a sorted list containing the binary id of each signature. For141 more details see `get_sig_id`. """142 if not signatures:143 return []144 return sorted([get_sig_id(sig, 0, False) for sig in signatures])145class IntroObject(metaclass=ABCMeta):146 """ Just an interface containing method for dumping objects in binary147 or textual form. """148 @abstractmethod149 def get_binary_header(self):150 """ Returns the binary header, dependent on the object type. """151 raise NotImplementedError152 @abstractmethod153 def get_binary(self):154 """ Returns the binary contents, dependent on the object type. """155 raise NotImplementedError156 def _cmp(self, other):157 """ Will be replaced below, and each of '__lt__', etc. will call the158 proper '_cmp' method. """159 raise NotImplementedError160 def __lt__(self, other):161 return self._cmp(other) < 0162 def __le__(self, other):163 return self._cmp(other) <= 0164 def __gt__(self, other):165 return self._cmp(other) > 0166 def __ge__(self, other):167 return self._cmp(other) > 0168 def __eq__(self, other):169 return self._cmp(other) == 0170 def __ne__(self, other):171 return self._cmp(other) != 0172class IntroFileHeader(IntroObject):173 """ Represent the header of the exceptions file. This is written only once,174 at the begining. """175 def __init__(self, km_count, um_count, kum_count, umgb_count, sig_count, build):176 self.km_count = km_count177 self.um_count = um_count178 self.kum_count = kum_count179 self.umgb_count = umgb_count180 self.sig_count = sig_count181 self.build = build182 def get_binary(self):183 """ Returns a binary string containing the values from the tuple.184 The format is as follows:185 - HeaderMagic[32]186 - VersionMajor[16]187 - VersionMinor[16]188 - KernelExceptionsCount[32]189 - UserExceptionsCount[32]190 - SignaturesCount[32]191 - BuildNumber[32]192 - UserGlobExceptionCount[32]193 - Reserved1[2 * 32] """194 return struct.pack(195 "<IHHIIIIIII",196 DEFAULT_FLAGS["HeaderMagic"],197 excfg.get("Version", "Major"),198 excfg.get("Version", "Minor"),199 self.km_count,200 self.um_count,201 self.sig_count,202 self.build,203 self.umgb_count,204 self.kum_count,205 0,206 )207 def _cmp(self, other):208 """ Not implemented in this class. """209 return NotImplemented210 def get_binary_header(self):211 """ Not implemented in this class. """212 return NotImplemented213class KernelException(IntroObject):214 """ Represents a kernel-mode exception. For more details about it's215 implementation see `get_binary`. """216 def _validate_args(self):217 """ Validates that the values are good. If they are missing, they will218 be initialized with the default values.219 Raises ValueError if anything is not valid. """220 if not self.originator and self.object_type not in ("token-privs", "security-descriptor", "acl-edit", "sud-modification"):221 raise ValueError("originator cannot be missing!")222 if not self.object_type:223 raise ValueError("object type cannot be missing!")224 if not self.victim and self.object_type in (225 "driver",226 "driver imports",227 "driver code",228 "driver data",229 "driver resources",230 "drvobj",231 "fastio",232 "driver exports",233 "token-privs",234 "security-descriptor",235 "acl-edit",236 "sud-modification"237 ):238 raise ValueError("Type %s requires a victim name!" % self.object_type)239 if self.victim and self.object_type in ("msr", "ssdt", "cr4", "idt", "idt-reg", "gdt-reg", "infinity-hook", "hal-perf-counter", "interrupt-obj"):240 raise ValueError("Type %s must miss victim name!" % self.object_type)241 if self.flags is None:242 # if no flags are given, then use the default ones243 self.flags = DEFAULT_FLAGS["KernelExceptionFlags"]244 elif not any(arch in self.flags for arch in ["32", "64"]):245 self.flags += " 32 64"246 flagsl = self.flags.split(" ")247 if self.object_type == "cr4":248 if ("smep" not in flagsl) and ("smap" not in flagsl):249 raise ValueError("Type cr4 must flags must contain smap/smep!")250 if not any(word in flagsl for word in ("read", "write", "exec")):251 self.flags += " write"252 if ("non-driver" in flagsl) and ("return-drv" not in flagsl):253 # non-driver implies return-drv254 self.flags += " return-drv"255 if "return-drv" in flagsl:256 raise ValueError('"return-drv" is now obsolete. Please use "return"!')257 # We moved return to the common area, so add return-drv if needed (for now)258 if "return" in flagsl:259 self.flags += " return-drv"260 if "integrity" in flagsl and self.object_type in ("token-privs", "sud-modification"):261 if self.originator:262 raise ValueError("Type %s with integrity flag must not have originator!" % self.object_type)263 self.originator = "-"264 self.originator_name = "-"265 elif self.object_type in ("token-privs",) and not self.originator:266 raise ValueError("Originator cannot be missing for %s without integrity flag!" % self.object_type)267 if self.object_type in ("security-descriptor", "acl-edit"):268 if "integrity" in flagsl:269 if self.originator:270 raise ValueError("Type %s with integrity flag must not have originator!" % self.object_type)271 self.originator = "-"272 self.originator_name = "-"273 else: 274 raise ValueError("%s only works with integrity flag for now!" % self.object_type)275 # sanitize the input276 if "linux" in self.flags:277 self.victim = self.victim if self.victim else "*"278 else:279 self.originator = self.originator.lower()280 self.originator_name = self.originator_name.lower()281 282 # for sud fields we need case sensitive hashes283 if self.object_type in ("sud-modification",):284 self.victim = self.victim if self.victim else "*"285 else:286 self.victim = self.victim.lower() if self.victim else "*"287 def _complete_binary_args(self):288 """ Complete self.binary dictionary with the binary representations289 of the self fields.290 'originator' field will be separeted into path + name. If only a name291 is given then path will be equal with the name. """292 self.binary["object"] = excfg.get("Types", "kernel-mode")[self.object_type]293 if self.originator != self.originator_name:294 bin_name = get_binary_name(self.originator_name, KernelException)295 else:296 bin_name = get_binary_name(self.originator_name, KernelException)297 self.binary["name"] = bin_name298 self.binary["path"] = -1299 if self.object_type in ("sud-modification"):300 self.binary["victim"] = get_binary_name(self.victim, KernelException, wide=False)301 elif self.object_type in ("token-privs", "security-descriptor", "acl-edit"):302 self.binary["victim"] = get_binary_name(self.victim, KernelException, wide=False, trim=IMAGE_BASE_NAME_LEN)303 else:304 self.binary["victim"] = get_binary_name(self.victim, KernelException)305 self.binary["flags"] = get_binary_flags(self.flags, KernelException)306 self.binary["signatures"] = get_binary_signatures(self.signatures)307 def __init__(self, originator=None, object_type=None, victim=None, flags=None, signatures=None, **kwargs):308 self.originator = originator309 self.originator_name = get_name_from_path(self.originator)310 self.object_type = object_type311 self.victim = victim312 self.flags = flags313 self.signatures = signatures314 self.binary = {}315 self._validate_args()316 self._complete_binary_args()317 def _cmp(self, other):318 """ Compares two KernelExceptions. The fields are compared in order:319 - originator path, originator name, object, victim, flags320 - signatures is ignored! """321 value = cmp_unsigned(self.binary["path"], other.binary["path"])322 if value:323 return value324 value = cmp_unsigned(self.binary["name"], other.binary["name"])325 if value:326 return value327 value = cmp_unsigned(self.binary["object"], other.binary["object"])328 if value:329 return value330 value = cmp_unsigned(self.binary["victim"], other.binary["victim"])331 if value:332 return value333 value = cmp_unsigned(self.binary["flags"], other.binary["flags"])334 if value:335 return value336 return 0337 def __str__(self):338 ret_str = "'%s' (%08x), '%s' (%08x), '%s' (%08x), '%s' (%02d), '%s' (%08x)" % (339 self.originator,340 self.binary["path"],341 self.originator_name,342 self.binary["name"],343 self.victim,344 self.binary["victim"],345 self.object_type,346 self.binary["object"],347 self.flags,348 self.binary["flags"],349 )350 if self.signatures:351 ret_str += ", %r (%r)" % (self.signatures, self.binary["signatures"])352 return ret_str353 def get_binary_header(self):354 """ Returns a binary string representing the header of the user-mode355 exception.356 The format it's as follows:357 - type[8] = 1358 - size[16] = 20 + len(sigs) * 4 """359 sigs = len(self.signatures) if self.signatures else 0360 return struct.pack("<BH", 1, 20 + (sigs * 4))361 def get_binary(self):362 """ Returns a binary string containing the binary representation of the363 kernel-mode exception.364 The format it's as follows:365 - name[32]366 - path[32]367 - victim[32],368 - flags[32]369 - reserved[8]370 - type[8]371 - sig_count[16]372 - sigs-array[32 * sig_count]373 name, path - a CRC32 will be applied to them374 victim - same as name and path375 object_type - the corresponding value in the hash map376 flags - an OR with the values from the hash map377 signatures - the strings will be converted into unique numeric ids """378 packed_sigs = bytes()379 for sig in self.binary["signatures"]:380 packed_sigs += struct.pack("<I", int(sig & 0xFFFFFFFF))381 # The '& 0xffffffff' is a quick hack to force unsigned numbers382 return (383 struct.pack(384 "<IIIIBBH",385 unsigned(self.binary["name"]),386 unsigned(self.binary["path"]),387 unsigned(self.binary["victim"]),388 unsigned(self.binary["flags"]),389 0, # The reserved byte390 unsigned(self.binary["object"]),391 unsigned(len(self.binary["signatures"])),392 )393 + packed_sigs394 )395class UserException(IntroObject):396 """ Represents a user-mode exception. For more details about it's397 implementation see `get_binary`. """398 def _validate_args(self):399 """ Validates that the values are good. If they are missing, they will400 be initialized with the default values.401 Raises ValueError if anything is not valid. """402 if not self.originator:403 raise ValueError("Originator cannot be missing!")404 if not self.object_type:405 raise ValueError("Object type cannot be missing!")406 if not self.victim and self.object_type in ("process", "module", "module imports"):407 raise ValueError("Type %s requires a victim name!" % self.object_type)408 if self.victim and self.victim != "*" and self.object_type == "nx_zone":409 raise ValueError("Type %s must miss victim name!" % self.object_type)410 if not self.process and self.object_type in ("module", "module imports"):411 raise ValueError("Process cannot be missing for %s!" % self.object_type)412 if self.process and self.object_type == "process":413 raise ValueError("Process must be missing for %s!" % self.object_type)414 if self.flags is None:415 self.flags = DEFAULT_FLAGS["UserExceptionFlags"]416 elif not any(arch in self.flags for arch in ["32", "64"]):417 self.flags += " 32 64"418 flagsl = self.flags.split(" ")419 if not any(word in flagsl for word in ['read', 'write', 'exec']):420 if self.object_type == 'nx_zone' or self.object_type == 'process-creation' or self.object_type == 'process-creation-dpi':421 self.flags += ' exec'422 else:423 self.flags += " write"424 flagsl = self.flags.split(" ")425 # a small sanity check426 if self.object_type == "nx_zone" and "exec" not in flagsl:427 raise ValueError("nx_zone must have exec flag set: %s" % self.flags)428 # sanitize the input429 if "linux" in self.flags:430 self.victim = self.victim if self.victim else "*"431 self.process = self.process if self.process else None432 else:433 self.originator = self.originator.lower()434 self.victim = self.victim.lower() if self.victim else "*"435 self.process = self.process.lower() if self.process else None436 def _complete_binary_args(self):437 """ Complete self.binary dictionary with the binary representations438 of the self fields. """439 self.binary["object"] = excfg.get("Types", "user-mode")[self.object_type]440 if self.object_type in (441 'process', 'thread-context', 'peb32', 'peb64', 'apc-thread', 'process-creation', 'process-creation-dpi', 'instrumentation-callback') and 'module-load' not in self.flags:442 if "linux" in self.flags:443 self.binary['originator'] = get_binary_name(self.originator, UserException,444 wide=False, trim=LIX_COMM_LEN)445 else:446 self.binary['originator'] = get_binary_name(self.originator, UserException,447 wide=False, trim=IMAGE_BASE_NAME_LEN)448 else:449 self.binary["originator"] = get_binary_name(self.originator, UserException)450 if self.object_type in ('process', 'thread-context', 'peb32', 'peb64', 'apc-thread', 'process-creation', 'double-agent', 'process-creation-dpi', 'instrumentation-callback'):451 if "linux" in self.flags:452 self.binary['victim'] = get_binary_name(self.victim, UserException,453 wide=False, trim=LIX_COMM_LEN)454 else:455 self.binary['victim'] = get_binary_name(self.victim, UserException,456 wide=False, trim=IMAGE_BASE_NAME_LEN)457 else:458 self.binary["victim"] = get_binary_name(self.victim, UserException)459 if self.process:460 self.binary["process"] = get_binary_name(461 self.process, UserException, wide=False, trim=IMAGE_BASE_NAME_LEN462 )463 else:464 self.binary["process"] = 0465 self.binary["flags"] = get_binary_flags(self.flags, UserException)466 self.binary["signatures"] = get_binary_signatures(self.signatures)467 def _fix_name(self, name):468 if not name:469 return name470 if name[0] == "*" and "\\x" in name:471 new_name = b""472 i = 0473 while i < len(name):474 # Let it crash in case i + 4 > len(name)... it's invalid after all475 if name[i] == "\\" and name[i + 1] == "x":476 new_name += bytes([int(name[i + 2 : i + 4], 16)])477 i += 4478 continue479 else:480 new_name += name[i].encode()481 i += 1482 return new_name483 return name484 def _fix_json_names(self):485 self.originator = self._fix_name(self.originator)486 self.victim = self._fix_name(self.victim)487 self.process = self._fix_name(self.process)488 def __init__(489 self,490 originator,491 object_type,492 victim=None,493 process=None,494 flags=None,495 signatures=None,496 **kwargs497 ):498 self.originator = originator499 self.object_type = object_type500 self.victim = victim501 self.process = process502 self.flags = flags503 self.signatures = signatures504 self.binary = {}505 self._validate_args()506 self._fix_json_names()507 self._complete_binary_args()508 def _cmp(self, other):509 """ Compares two UserExceptions by the binary value of the fields.510 The fields are compared in the following order:511 - originator, object, victim, process, flags.512 - signatures is ignored! """513 value = cmp_unsigned(self.binary["originator"], other.binary["originator"])514 if value:515 return value516 value = cmp_unsigned(self.binary["object"], other.binary["object"])517 if value:518 return value519 value = cmp_unsigned(self.binary["victim"], other.binary["victim"])520 if value:521 return value522 value = cmp_unsigned(self.binary["process"], other.binary["process"])523 if value:524 return value525 value = cmp_unsigned(self.binary["flags"], other.binary["flags"])526 if value:527 return value528 return 0529 def __str__(self):530 ret_str = "'%s' (%08x), '%s' (%08x), '%s' (%08x), '%s' (%02d), '%s' (%08x)" % (531 self.originator,532 self.binary["originator"],533 self.victim,534 self.binary["victim"],535 self.process,536 self.binary["process"],537 self.object_type,538 self.binary["object"],539 self.flags,540 self.binary["flags"],541 )542 if self.signatures:543 ret_str += ", %r (%r)" % (self.signatures, self.binary["signatures"])544 return ret_str545 def get_binary_header(self):546 """ Returns a binary string representing the header of the user-mode547 exception.548 The format it's as follows:549 - type[8] = 2550 - size[16] = 20 + len(sigs) * 4 """551 sigs = len(self.signatures) if self.signatures else 0552 return struct.pack("<BH", 2, 20 + (sigs * 4))553 def get_binary(self):554 """ Returns a binary string containing the binary representation of the555 user-mode exception.556 The format it's as follows:557 - originator[32]558 - victim[32]559 - process[32],560 - flags[32]561 - reserved[8]562 - type[8]563 - sig_count[8]564 - sigs-array[32 * sig_count]565 process - truncated to 15 chars (including NULL terminator)566 victim - if object_type is 'process' then it will be truncated same as 'process'567 originator, - same as victim568 object_type - the corresponding value in the hash map569 flags - an OR with the values from the hash map570 signatures - the strings will be converted into unique numeric ids """571 packed_sigs = bytes()572 for sig in self.binary["signatures"]:573 packed_sigs += struct.pack("<I", unsigned(sig))574 if "ignore" in self.flags:575 print(self)576 # The '& 0xffffffff' is a quick hack to force unsigned numbers577 return (578 struct.pack(579 "<IIIIBBH",580 unsigned(self.binary["originator"]),581 unsigned(self.binary["victim"]),582 unsigned(self.binary["process"]),583 unsigned(self.binary["flags"]),584 0, # The reserved byte585 unsigned(self.binary["object"]),586 unsigned(len(self.binary["signatures"])),587 )588 + packed_sigs589 )590class UserApcException(IntroObject):591 """ Represents a user-mode exception. For more details about it's592 implementation see `get_binary`. """593 def _validate_args(self):594 """ Validates that the values are good. If they are missing, they will595 be initialized with the default values.596 Raises ValueError if anything is not valid. """597 if not self.originator:598 raise ValueError("Originator cannot be missing!")599 if not self.object_type:600 raise ValueError("Object type cannot be missing!")601 if not self.victim and self.object_type in ("process", "module", "module imports"):602 raise ValueError("Type %s requires a victim name!" % self.object_type)603 if self.victim and self.object_type == "nx_zone":604 raise ValueError("Type %s must miss victim name!" % self.object_type)605 if not self.process and self.object_type in ("module", "module imports"):606 raise ValueError("Process cannot be missing for %s!" % self.object_type)607 if self.process and self.object_type == "process":608 raise ValueError("Process must be missing for %s!" % self.object_type)609 if self.flags is None:610 self.flags = DEFAULT_FLAGS["UserExceptionFlags"]611 elif not any(arch in self.flags for arch in ["32", "64"]):612 self.flags += " 32 64"613 flagsl = self.flags.split(" ")614 if not any(word in flagsl for word in ["read", "write", "exec"]):615 if self.object_type == "nx_zone":616 self.flags += " exec"617 else:618 self.flags += " write"619 flagsl = self.flags.split(" ")620 # a small sanity check621 if self.object_type == "nx_zone" and "exec" not in flagsl:622 raise ValueError("nx_zone must have exec flag set: %s" % self.flags)623 # sanitize the input624 if "linux" in self.flags:625 self.victim = self.victim if self.victim else "*"626 self.process = self.process if self.process else None627 else:628 self.originator = self.originator.lower()629 self.victim = self.victim.lower() if self.victim else "*"630 self.process = self.process.lower() if self.process else None631 def _complete_binary_args(self):632 """ Complete self.binary dictionary with the binary representations633 of the self fields. """634 self.binary["object"] = excfg.get("Types", "user-mode")[self.object_type]635 if (636 self.object_type in ("process", "thread-context", "peb32", "peb64", "apc-thread", "double-agent", "instrumentation-callback")637 and "module-load" not in self.flags638 ):639 self.binary["originator"] = get_binary_name(640 self.originator, UserException, wide=False, trim=IMAGE_BASE_NAME_LEN641 )642 else:643 self.binary["originator"] = get_binary_name(self.originator, UserException)644 if self.object_type in ("process", "thread-context", "peb32", "peb64", "apc-thread", "double-agent", "instrumentation-callback"):645 self.binary["victim"] = get_binary_name(646 self.victim, UserException, wide=False, trim=IMAGE_BASE_NAME_LEN647 )648 else:649 self.binary["victim"] = get_binary_name(self.victim, UserException)650 if self.process:651 self.binary["process"] = get_binary_name(652 self.process, UserException, wide=False, trim=IMAGE_BASE_NAME_LEN653 )654 else:655 self.binary["process"] = 0656 self.binary["flags"] = get_binary_flags(self.flags, UserException)657 self.binary["signatures"] = get_binary_signatures(self.signatures)658 def __init__(659 self,660 originator,661 object_type,662 victim=None,663 process=None,664 flags=None,665 signatures=None,666 **kwargs667 ):668 self.originator = originator669 self.object_type = object_type670 self.victim = victim671 self.process = process672 self.flags = flags673 self.signatures = signatures674 self.binary = {}675 self._validate_args()676 self._complete_binary_args()677 def _cmp(self, other):678 """ Compares two UserExceptions by the binary value of the fields.679 The fields are compared in the following order:680 - originator, object, victim, process, flags.681 - signatures is ignored! """682 value = cmp_unsigned(self.binary["originator"], other.binary["originator"])683 if value:684 return value685 value = cmp_unsigned(self.binary["object"], other.binary["object"])686 if value:687 return value688 value = cmp_unsigned(self.binary["victim"], other.binary["victim"])689 if value:690 return value691 value = cmp_unsigned(self.binary["process"], other.binary["process"])692 if value:693 return value694 value = cmp_unsigned(self.binary["flags"], other.binary["flags"])695 if value:696 return value697 return 0698 def __str__(self):699 ret_str = "'%s' (%08x), '%s' (%08x), '%s' (%08x), '%s' (%02d), '%s' (%08x)" % (700 self.originator,701 self.binary["originator"],702 self.victim,703 self.binary["victim"],704 self.process,705 self.binary["process"],706 self.object_type,707 self.binary["object"],708 self.flags,709 self.binary["flags"],710 )711 if self.signatures:712 ret_str += ", %r (%r)" % (self.signatures, self.binary["signatures"])713 return ret_str714 def get_binary_header(self):715 """ Returns a binary string representing the header of the user-mode716 exception.717 The format it's as follows:718 - type[8] = 9719 - size[16] = 20 + len(sigs) * 4 """720 sigs = len(self.signatures) if self.signatures else 0721 return struct.pack("<BH", 9, 20 + (sigs * 4))722 def get_binary(self):723 """ Returns a binary string containing the binary representation of the724 user-mode exception.725 The format it's as follows:726 - originator[32]727 - victim[32]728 - process[32],729 - flags[32]730 - reserved[8]731 - type[8]732 - sig_count[8]733 - sigs-array[32 * sig_count]734 process - truncated to 15 chars (including NULL terminator)735 victim - if object_type is 'process' then it will be truncated same as 'process'736 originator, - same as victim737 object_type - the corresponding value in the hash map738 flags - an OR with the values from the hash map739 signatures - the strings will be converted into unique numeric ids """740 packed_sigs = bytes()741 for sig in self.binary["signatures"]:742 packed_sigs += struct.pack("<I", unsigned(sig))743 if "ignore" in self.flags:744 print(self)745 # The '& 0xffffffff' is a quick hack to force unsigned numbers746 return (747 struct.pack(748 "<IIIIBBH",749 unsigned(self.binary["originator"]),750 unsigned(self.binary["victim"]),751 unsigned(self.binary["process"]),752 unsigned(self.binary["flags"]),753 0, # The reserved byte754 unsigned(self.binary["object"]),755 unsigned(len(self.binary["signatures"])),756 )757 + packed_sigs758 )759class UserGlobException(IntroObject):760 """ Represents a user-mode exception (glob match). For more details about it's761 implementation see `get_binary`. """762 def _validate_glob(self, pattern):763 pattern_chars = re.sub(r"\[[^]]+\]|\?", "", pattern)764 items = re.findall(r"\[[^]]+\]|\?", pattern)765 if len(pattern_chars) + len(items) > IMAGE_BASE_NAME_LEN:766 raise ValueError("Pattern too long: %s" % pattern)767 def _validate_args(self):768 """ Validates that the values are good. If they are missing, they will769 be initialized with the default values.770 Raises ValueError if anything is not valid. """771 glob_items = ["*", "[", "]", "?", "\\\\"]772 if not self.originator:773 raise ValueError("Originator cannot be missing!")774 if not self.object_type:775 raise ValueError("Object type cannot be missing!")776 if not self.victim and self.object_type in ("process", "module", "module imports"):777 raise ValueError("Type %s requires a victim name!" % self.object_type)778 if self.victim and self.object_type == "nx_zone":779 raise ValueError("Type %s must miss victim name!" % self.object_type)780 if not self.process and self.object_type in ("module", "module imports"):781 raise ValueError("Process cannot be missing for %s!" % self.object_type)782 if self.process and self.object_type == "process":783 raise ValueError("Process must be missing for %s!" % self.object_type)784 self.process = self.process if self.process else ""785 if not any(item in self.originator + self.victim + self.process for item in glob_items):786 raise ValueError(787 "At least one field (process, originator, victim) must contain glob items(*, ?, [, ] )."788 )789 if self.process:790 self._validate_glob(self.process)791 if self.victim:792 self._validate_glob(self.victim)793 if self.originator:794 self._validate_glob(self.originator)795 if self.flags is None:796 self.flags = DEFAULT_FLAGS["UserExceptionFlags"]797 elif not any(arch in self.flags for arch in ["32", "64"]):798 self.flags += " 32 64"799 flagsl = self.flags.split(" ")800 if not any(word in flagsl for word in ["read", "write", "exec"]):801 if self.object_type == "nx_zone" or self.object_type == 'process-creation' or self.object_type == 'process-creation-dpi':802 self.flags += " exec"803 else:804 self.flags += " write"805 flagsl = self.flags.split(" ")806 # a small sanity check807 if self.object_type == "nx_zone" and "exec" not in flagsl:808 raise ValueError("nx_zone must have exec flag set: %s" % self.flags)809 self.victim = self.victim if self.victim else "*"810 self.process = self.process if self.process != "" else "*"811 def _complete_binary_args(self):812 """ Complete self.binary dictionary with the binary representations813 of the self fields. """814 self.binary["object"] = excfg.get("Types", "user-mode")[self.object_type]815 if self.object_type in ("process", "thread-context") and "module-load" not in self.flags:816 self.binary["originator"] = self.originator817 else:818 self.binary["originator"] = self.originator819 if self.object_type in ("process", "thread-context"):820 self.binary["victim"] = self.victim821 else:822 self.binary["victim"] = self.victim823 if self.process:824 self.binary["process"] = self.process825 else:826 self.binary["process"] = "*"827 self.binary["flags"] = get_binary_flags(self.flags, UserGlobException)828 self.binary["signatures"] = get_binary_signatures(self.signatures)829 def __init__(830 self,831 originator,832 object_type,833 victim=None,834 process=None,835 flags=None,836 signatures=None,837 **kwargs838 ):839 self.originator = originator840 self.object_type = object_type841 self.victim = victim842 self.process = process843 self.flags = flags844 self.signatures = signatures845 self.binary = {}846 self._validate_args()847 self._complete_binary_args()848 def _cmp(self, other):849 """ Compares two UserGlobException by the binary value of the fields.850 The fields are compared in the following order:851 - originator, object, victim, process, flags.852 - signatures is ignored! """853 value = cmp_string(self.binary["originator"], other.binary["originator"])854 if value:855 return value856 value = cmp_string(self.binary["object"], other.binary["object"])857 if value:858 return value859 value = cmp_string(self.binary["victim"], other.binary["victim"])860 if value:861 return value862 value = cmp_string(self.binary["process"], other.binary["process"])863 if value:864 return value865 value = cmp_unsigned(self.binary["flags"], other.binary["flags"])866 if value:867 return value868 return 0869 def __str__(self):870 ret_str = "'%s' (%s), '%s' (%s), '%s' (%s), '%s' (%02d), '%s' (%08x)" % (871 self.originator,872 self.binary["originator"],873 self.victim,874 self.binary["victim"],875 self.process,876 self.binary["process"],877 self.object_type,878 self.binary["object"],879 self.flags,880 self.binary["flags"],881 )882 if self.signatures:883 ret_str += ", %r (%r)" % (self.signatures, self.binary["signatures"])884 return ret_str885 def get_binary_header(self):886 """ Returns a binary string representing the header of the user-mode (glob match)887 exception.888 The format it's as follows:889 - type[8] = 6890 - size[16] = 8 + len(originator) + len(victim) + len(process) len(sigs) * 4 """891 sigs = len(self.signatures) if self.signatures else 0892 len_victim = len(self.victim) + 1 if self.victim else 0893 len_originator = len(self.originator) + 1 if self.originator else 0894 len_process = len(self.process) + 1 if self.process else 0895 size = 8 + len_originator + len_victim + len_process896 return struct.pack("<BH", 6, size + (sigs * 4))897 def get_binary(self):898 """ Returns a binary string containing the binary representation of the899 user-mode exception (glob match).900 The format it's as follows:901 - flags[32]902 - reserved[8]903 - type[8]904 - sig_count[8]905 - originator[32]906 - victim[32]907 - process[32],908 - sigs-array[32 * sig_count]909 process - regex910 victim - regex911 originator - regex912 object_type - the corresponding value in the hash map913 flags - an OR with the values from the hash map914 signatures - the strings will be converted into unique numeric ids """915 packed_sigs = bytes()916 for sig in self.binary["signatures"]:917 packed_sigs += struct.pack("<I", unsigned(sig))918 if "ignore" in self.flags:919 print(self)920 # The '& 0xffffffff' is a quick hack to force unsigned numbers921 return (922 struct.pack(923 "<IBBH%ds%ds%ds"924 % (925 len(self.binary["originator"]) + 1, # add NULL-terminator926 len(self.binary["victim"]) + 1, # add NULL-terminator927 len(self.binary["process"]) + 1,928 ), # add NULL-terminator929 unsigned(self.binary["flags"]),930 0, # The reserved byte931 unsigned(self.binary["object"]),932 unsigned(len(self.binary["signatures"])),933 bytes(self.binary["originator"], "utf-8"),934 bytes(self.binary["victim"], "utf-8"),935 bytes(self.binary["process"], "utf-8"),936 )937 + packed_sigs938 )939class KernelUserException(IntroObject):940 """ Represents a kernel-user exception. For more details about it's941 implementation see `get_binary`. """942 def _validate_args(self):943 """ Validates that the values are good. If they are missing, they will944 be initialized with the default values.945 Raises ValueError if anything is not valid. """946 if not self.originator:947 raise ValueError("Originator cannot be missing!")948 if not self.object_type:949 raise ValueError("Object type cannot be missing!")950 if not self.victim:951 raise ValueError("Victim cannot be missing!" % self.object_type)952 if self.flags is None:953 # if no flags are given, then use the default ones954 self.flags = DEFAULT_FLAGS["KernelExceptionFlags"]955 elif not any(arch in self.flags for arch in ["32", "64"]):956 self.flags += " 32 64"957 flagsl = self.flags.split(" ")958 if not any(word in flagsl for word in ("read", "write", "exec")):959 self.flags += " write"960 if ("non-driver" in flagsl) and ("return-drv" not in flagsl):961 # non-driver implies return-drv962 self.flags += " return-drv"963 if "return" in flagsl:964 self.flags += " return-drv"965 if "user" in flagsl and "kernel" in flagsl:966 raise ValueError("Both user and kernel injection flags were given!")967 if "linux" in self.flags:968 self.victim = self.victim if self.victim else "*"969 self.originator = self.originator if self.originator else "*"970 self.process = self.process if self.process else "*"971 else:972 self.originator = self.originator.lower()973 self.victim = self.victim.lower() if self.victim else "*"974 self.process = self.process.lower() if self.process else "*"975 def _complete_binary_args(self):976 """ Complete self.binary dictionary with the binary representations977 of the self fields.978 'originator' field will be separeted into path + name. If only a name979 is given then path will be equal with the name. """980 self.binary["object"] = excfg.get("Types", "kernel-user-mode")[self.object_type]981 if "user" in self.flags:982 self.binary["originator"] = get_binary_name(self.originator, UserException, wide=False, trim=IMAGE_BASE_NAME_LEN)983 else:984 self.binary["originator"] = get_binary_name(self.originator, KernelException)985 self.binary["victim"] = get_binary_name(self.victim, UserException)986 self.binary["flags"] = get_binary_flags(self.flags, KernelUserException)987 self.binary["signatures"] = get_binary_signatures(self.signatures)988 self.binary["process"] = get_binary_name(self.process, UserException, wide=False, trim=IMAGE_BASE_NAME_LEN)989 def __init__(self, process=None, originator=None, object_type=None, victim=None, flags=None, signatures=None, **kwargs):990 self.originator = originator991 self.object_type = object_type992 self.process = process993 self.victim = victim994 self.flags = flags995 self.signatures = signatures996 self.binary = {}997 self._validate_args()998 self._complete_binary_args()999 def _cmp(self, other):1000 """ Compares two KernelExceptions. The fields are compared in order:1001 - originator path, originator name, object, victim, flags1002 - signatures is ignored! """1003 value = cmp_unsigned(self.binary["originator"], other.binary["originator"])1004 if value:1005 return value1006 value = cmp_unsigned(self.binary["process"], other.binary["process"])1007 if value:1008 return value1009 value = cmp_unsigned(self.binary["object"], other.binary["object"])1010 if value:1011 return value1012 value = cmp_unsigned(self.binary["victim"], other.binary["victim"])1013 if value:1014 return value1015 value = cmp_unsigned(self.binary["flags"], other.binary["flags"])1016 if value:1017 return value1018 return 01019 def __str__(self):1020 ret_str = "'%s' (%08x), '%s' (%08x), '%s' (%08x), '%s' (%02d), '%s' (%08x)" % (1021 self.process,1022 self.binary["process"],1023 self.originator,1024 self.binary["originator"],1025 self.victim,1026 self.binary["victim"],1027 self.object_type,1028 self.binary["object"],1029 self.flags,1030 self.binary["flags"],1031 )1032 if self.signatures:1033 ret_str += ", %r (%r)" % (self.signatures, self.binary["signatures"])1034 return ret_str1035 def get_binary_header(self):1036 """ Returns a binary string representing the header of the user-mode1037 exception.1038 The format it's as follows:1039 - type[8] = 11040 - size[16] = 20 + len(sigs) * 4 """1041 sigs = len(self.signatures) if self.signatures else 01042 return struct.pack("<BH", 14, 20 + (sigs * 4))1043 def get_binary(self):1044 """ Returns a binary string containing the binary representation of the1045 kernel-mode exception.1046 The format it's as follows:1047 - name[32]1048 - path[32]1049 - victim[32],1050 - flags[32]1051 - reserved[8]1052 - type[8]1053 - sig_count[16]1054 - sigs-array[32 * sig_count]1055 name, path - a CRC32 will be applied to them1056 victim - same as name and path1057 object_type - the corresponding value in the hash map1058 flags - an OR with the values from the hash map1059 signatures - the strings will be converted into unique numeric ids """1060 packed_sigs = bytes()1061 for sig in self.binary["signatures"]:1062 packed_sigs += struct.pack("<I", int(sig & 0xFFFFFFFF))1063 # The '& 0xffffffff' is a quick hack to force unsigned numbers1064 return (1065 struct.pack(1066 "<IIIIBBH",1067 unsigned(self.binary["originator"]),1068 unsigned(self.binary["victim"]),1069 unsigned(self.binary["process"]),1070 unsigned(self.binary["flags"]),1071 0, # The reserved byte1072 unsigned(self.binary["object"]),1073 unsigned(len(self.binary["signatures"])),1074 )1075 + packed_sigs1076 )1077class Signature(IntroObject):1078 """ Represents an exception signature. For more details about it's implementation see it's1079 subclasses. """1080 def _cmp(self, other):1081 pass1082 def get_binary_header(self):1083 pass1084 def get_binary(self):1085 pass1086class ExportSignature(Signature):1087 """ Represents an export signature. For more details about it's implementation see1088 `get_binary`. """1089 def _validate_args(self):1090 """ Validates that the values are good. If they are missing, they will1091 be initialized with the default values. """1092 if not self.sig_id:1093 raise ValueError("id is required!")1094 if not self.hashes:1095 raise ValueError("hashes is required!")1096 if not self.library:1097 raise ValueError("library is required!")1098 if not self.flags:1099 self.flags = DEFAULT_FLAGS["SignatureFlags"]1100 elif not any(arch in self.flags for arch in ["32", "64"]):1101 self.flags += " 32 64"1102 def _complete_binary_args(self):1103 """ Complete self.binary dictionary with the binary representations1104 of the self fields. """1105 self.binary["id"] = get_sig_id(self.sig_id, excfg.get("SignatureType", "export"))1106 self.binary["flags"] = get_binary_flags(self.flags, Signature)1107 self.binary["library"] = get_binary_name(self.library, UserException, wide=True)1108 self.binary["hashes"] = []1109 for hl in self.hashes:1110 int_hash_list = []1111 int_hash_list.append(1112 {1113 "name": get_binary_name(hl["name"], Signature, wide=False),1114 "delta": int(hl["delta"]),1115 }1116 )1117 self.binary["hashes"].append(int_hash_list)1118 def __init__(self, sig_id, hashes, library, flags=None, **kwargs):1119 self.sig_id = sig_id1120 self.library = library1121 self.hashes = hashes1122 self.flags = flags1123 self.binary = {}1124 self._validate_args()1125 self._complete_binary_args()1126 def _cmp(self, other):1127 """ Signatures must have an unique ID. So we can safely compare them...
hw10_grader.py
Source:hw10_grader.py
...84 Test("business.cpp", ['100', '0.1', '2', '0.2'],85 "interest: 16.64\n"86 "terminal value: 116.64\n")87 ]88def get_binary_name(test):89 """Return the name of the binary for this test."""90 filepath = os.path.dirname(__file__)91 if not filepath:92 filepath = './'93 return os.path.join(filepath, os.path.splitext(test.file)[0])94def delete_binary(test):95 """Remove the binary for this test."""96 try:97 os.remove(get_binary_name(test))98 except FileNotFoundError:99 pass100def compile_(test):101 """Compile the test and return true if there are no warnings."""102 process = subprocess.Popen(CPP + ["-o",103 get_binary_name(test), test.file],104 stdin=subprocess.PIPE, stdout=subprocess.PIPE,105 stderr=subprocess.PIPE)106 out, err = process.communicate()107 errors_and_warnings = out.decode("utf-8") + err.decode("utf-8")108 if errors_and_warnings:109 print("{} had warnings or errors during compilation".format(test.file),110 file=sys.stderr)111 print('"""')112 print(FAIL + errors_and_warnings + ENDC, file=sys.stderr, end='')113 print('"""')114 return False115 return True116def grade(test):117 """Grade a submitted example."""118 if not compile_(test):119 print("no points for " + test.file)120 return 0121 command = [get_binary_name(test), ]122 if isinstance(test.input, list):123 command += test.input124 process = subprocess.Popen(command,125 stdin=subprocess.PIPE, stdout=subprocess.PIPE,126 stderr=subprocess.PIPE)127 if isinstance(test.input, str):128 out, err = process.communicate(test.input.encode("utf-8"))129 else:130 out, err = process.communicate()131 actual = out.decode("utf-8")132 if test.expected == actual:133 return 1134 else:135 print("{} is not correct".format(test.file))...
parse_commands.py
Source:parse_commands.py
...29 return processes30# map pid => (name, handle)31processes = load_processes('scans/pslist.txt') 32# Extract a binary name using the pdb string33def get_binary_name(data):34 n = 035 while n < len(data):36 n = data.find('RSDS', n + 1)37 if n < 0:38 break39 k = data.find('.pdb', n)40 if k > n + 256:41 continue42 return data[n + 24:k].split('\\')[5]43 return None44# global map tags => plugin45tag_map = {}46# command handlers47def cmd_install_plugin(idx, p0, p1, pkt):48 global tag_map, processes49 size, cmd, tag, pid, port = struct.unpack('<LLLLL', pkt[:0x14])50 if pid in processes:51 psname = processes[pid][0]52 else:53 psname = 'unknown'54 bin_name = get_binary_name(pkt[0x1c:])55 tag_map[tag] = bin_name56 print '%02x install-plugin%d%d(%s, tag=0x%08x, pid=%d %s, port=%d' % (57 idx, p0, p1, bin_name, tag, pid, psname, port)58def cmd_call_plugin(idx, pkt):59 size, cmd, tag, subcmd, arg_size = struct.unpack('<LLLLL', pkt[:0x14])60 args = pkt[0x14:]61 if tag in tag_map:62 name = tag_map[tag]63 else:64 name = 'unknown_%08x' % tag65 # handle sub-arguments66 arg_string = ''67 if arg_size > 0:68 if name == 'filedll':69 if subcmd == 0x7268f598:70 arg_string = ', '.join([ '', '"' + trim_asciiz(pkt[0x14:0x118]) + '"', '"' + trim_asciiz(pkt[0x118:]) + '"'])71 elif subcmd == 0x1e3258ab:72 arg_string = ', "' + trim_asciiz(pkt[0x14:]) + '"'# + ', %d' % ord(pkt[0x14+0x104])73 elif name == 'keylogdll':74 arg_string = ', ' + '%d' % struct.unpack('<L', pkt[0x14:0x18])[0]75 if arg_string == '':76 arg_string = ', ' + ''.join([ '%02x' % ord(x) for x in pkt[0x14:0x14 + arg_size] ])77 print '%02x call-plugin(%s, 0x%08x%s)' % (idx, name, subcmd, arg_string)78def cmd_ioctl(idx, pkt):79 ioctl, data_size = struct.unpack('<LL', pkt[0x10c:0x114])80 data = pkt[0x114:0x114 + data_size]81 print '%02x ioctl(%s, 0x%08x, { %s })' % (idx, trim_asciiz(pkt[0x08:0x10c]), ioctl, ' '.join([ '%02x' % ord(x) for x in data ]))82def cmd_install_driver(idx, pkt):83 print '%02x install-driver(%s)' % (idx, get_binary_name(pkt[0x0c:]))84cmds = {85 0x34B30F3B:lambda x, y: cmd_install_plugin(x, 0, 1, y),86 0x8168AD41:lambda x, y: cmd_install_plugin(x, 1, 0, y),87 0xCD762BFA:lambda x, y: cmd_install_plugin(x, 0, 0, y),88 0xD180DAB5:cmd_call_plugin,89 0xD44D6B6C:cmd_install_driver,90 0x427906F4:cmd_ioctl91 }92def cmd_parser(data):93 p = 094 while p < len(data):95 pkt_len = struct.unpack('<L', data[p:p+4])[0]96 yield data[p:p+pkt_len]97 p += pkt_len98def main():99 if not os.path.isdir('commands'):100 os.mkdir('commands')101 n = 0102 excess = ''103 for pkt in cmd_parser(open('cmd_stream.bin', 'rb').read()):104 hdr = struct.unpack('<LLLL', pkt[:16])105 name = get_binary_name(pkt)106 if name is None:107 name = ''108 else:109 if pkt[0x1c:0x1e] == 'MZ':110 fname = name + '.dll'111 ofs = 0x1c112 else:113 fname = name + '.sys'114 ofs = 0x0c115 fname = 'binaries/' + fname116 if not os.path.isfile(fname):117 open(fname, 'wb').write(pkt[ofs:])118 open('commands/cmd%02x.bin' % n, 'wb').write(pkt)119# print '%02x' % n + (' %08x' * 4) % hdr, name...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!