Best Python code snippet using Airtest
usbdeviceforensics.py
Source:usbdeviceforensics.py
...414 if len(parts_serial_no) == 2:415 serial_no = parts_serial_no[0]416 else:417 serial_no = device_sk.name()418 usb_device = get_usb_device(serial_number=serial_no,419 vendor=vendor,420 product=product,421 version=version)422 if usb_device is None:423 continue424 for sub_key_device in device_sk.subkeys():425 if sub_key_device.name().lower() != 'properties':426 continue427 key64 = get_key(sub_key_device, r'{83da6326-97a6-4088-9453-a1923f573b29}\00000064\00000000')428 if key64 is not None:429 value64 = get_reg_value(key64, 'Data')430 if value64 is not None:431 usb_device.usbstor_datetime64 = value64.value()432 write_debug(name='USBSTOR date/time (64)', value=usb_device.usbstor_datetime64.strftime('%Y-%m-%dT%H:%M:%S'))433 else:434 write_debug(data='{83da6326-97a6-4088-9453-a1923f573b29}\\00000064\\00000000 is None')435 key65 = get_key(sub_key_device, r'{83da6326-97a6-4088-9453-a1923f573b29}\00000065\00000000')436 if key65 is not None:437 value65 = get_reg_value(key65, 'Data')438 if value65 is not None:439 usb_device.usbstor_datetime65 = value65.value()440 write_debug(name='USBSTOR date/time (65)', value=usb_device.usbstor_datetime65.strftime('%Y-%m-%dT%H:%M:%S'))441 else:442 write_debug(data='{83da6326-97a6-4088-9453-a1923f573b29}\\00000065\\00000000 is None')443 key64win8 = get_key(sub_key_device, r'{83da6326-97a6-4088-9453-a1923f573b29}\0064')444 if key64win8 is not None:445 value64win8 = get_reg_value(key64win8, '(default)')446 if value64win8 is not None:447 usb_device.usbstor_datetime64 = value64win8.value()448 write_debug(name='USBSTOR date/time (64)', value=usb_device.usbstor_datetime64.strftime('%Y-%m-%dT%H:%M:%S'))449 else:450 write_debug(data='{83da6326-97a6-4088-9453-a1923f573b29}\\0064 is None')451 key65win8 = get_key(sub_key_device, r'{83da6326-97a6-4088-9453-a1923f573b29}\0065')452 if key65win8 is not None:453 value65win8 = get_reg_value(key65win8, '(default)')454 if not value65win8 is None:455 usb_device.usbstor_datetime65 = value65win8.value()456 write_debug(name='USBSTOR date/time (65)', value=usb_device.usbstor_datetime65.strftime('%Y-%m-%dT%H:%M:%S'))457 else:458 write_debug(data='{83da6326-97a6-4088-9453-a1923f573b29}\\0065 is None')459 key66 = get_key(sub_key_device, r'{83da6326-97a6-4088-9453-a1923f573b29}\0066')460 if key66 is not None:461 value66 = get_reg_value(key66, '(default)')462 if value66 is not None:463 usb_device.usbstor_datetime66 = value66.value()464 write_debug(name='USBSTOR date/time (66)', value=usb_device.usbstor_datetime66.strftime('%Y-%m-%dT%H:%M:%S'))465 else:466 write_debug(data='{83da6326-97a6-4088-9453-a1923f573b29}\\0066\\(default) is None')467 else:468 write_debug(data='{83da6326-97a6-4088-9453-a1923f573b29}\\0066 is None')469 key67 = get_key(sub_key_device, r'{83da6326-97a6-4088-9453-a1923f573b29}\0067')470 if key67 is not None:471 value67 = get_reg_value(key67, '(default)')472 if value67 is not None:473 usb_device.usbstor_datetime67 = value67.value()474 write_debug(name='USBSTOR date/time (67)', value=usb_device.usbstor_datetime67.strftime('%Y-%m-%dT%H:%M:%S'))475 else:476 write_debug(data='{83da6326-97a6-4088-9453-a1923f573b29}\\0067 is None')477def process_usb(registry):478 """Processes the CCS \Enum\USB keys"""479 write_debug(data='Method: process_usb')480 ccs = []481 root_key = registry.root()482 for k in root_key.subkeys():483 if 'ControlSet' in k.name():484 ccs.append(k.name())485 for c in ccs:486 key = registry.open(c + '\\Enum\\USB')487 for sub_key in key.subkeys():488 if (not 'vid' in sub_key.name().lower() and489 not 'pid' in sub_key.name().lower()):490 continue491 # Get the serial number which is the next key492 for serial_key in sub_key.subkeys():493 usb_device = get_usb_device(serial_number=serial_key.name())494 if usb_device is None:495 continue496 vid_pid = sub_key.name().split('&')497 usb_device.vid = vid_pid[0]498 write_debug(name='VID', value=usb_device.vid)499 usb_device.pid = vid_pid[1]500 write_debug(name='PID', value=usb_device.pid)501 usb_device.vid_pid_datetime = sub_key.timestamp()502 write_debug(name='VID/PID datetime', value=usb_device.vid_pid_datetime.strftime('%Y-%m-%dT%H:%M:%S'))503def process_mounted_devices(registry):504 """Processes the MountedDevices keys"""505 write_debug(data='Method: process_mounted_devices')506 global usb_devices507 reg_key = registry.root().find_key('MountedDevices')508 if reg_key is None:509 write_debug(data='MountedDevices key does not exist')510 return511 for usb_device in usb_devices:512 for reg_value in reg_key.values():513 # Example Data: USBSTOR#Disk&Ven_SanDisk&Prod_Cruzer&Rev_7.01#2444120C4E80D827&514 data = reg_value.value()515 data = remove_non_ascii_characters(data)516 if '\\DosDevices\\' in reg_value.name():517 if len(data) == 12: # Drive Sig (DWORD) Partition Offset (DWORD DWORD)518 dos_device = bytearray(data[0:4])519 usb_device.disk_signature = ''.join('{:02x}'.format(byte) for byte in dos_device)520 write_debug(name='Disk sig', value=usb_device.disk_signature)521 if len(usb_device.parent_prefix_id) > 0:522 if '\\DosDevices\\' in reg_value.name():523 if usb_device.parent_prefix_id in str(data):524 usb_device.drive_letter = reg_value.name().replace('\\DosDevices\\', '')525 write_debug(name='Drive letter', value=usb_device.drive_letter)526 if '\\Volume{' in reg_value.name():527 if usb_device.parent_prefix_id in data:528 guid = reg_value.name()[11:]529 guid = guid[:len(guid)-1]530 usb_device.guid = guid531 write_debug(name='GUID', value=usb_device.guid)532 usb_device.mountpoint = data[4:]533 write_debug(name='Mountpoint', value=usb_device.mountpoint)534 # If the drive letter is missing from being identified by the535 # ParentPrefixId then try matching the full device string536 if len(usb_device.drive_letter) == 0:537 if '\\DosDevices\\' in reg_value.name():538 if ('USBSTOR#Disk&' +539 usb_device.vendor + '&' +540 usb_device.product + '&' +541 usb_device.version + '#' +542 usb_device.serial_number) in data:543 usb_device.drive_letter = reg_value.name().replace('\\DosDevices\\', '')544 write_debug(name='Drive letter', value=usb_device.drive_letter)545 # If the GUID is missing from being identified by the546 # ParentPrefixId then try matching the full device string547 if len(usb_device.guid) == 0:548 if '\\Volume{' in reg_value.name():549 temp = str(('USBSTOR#Disk&' +550 usb_device.vendor + '&' +551 usb_device.product + '&' +552 usb_device.version + '#' +553 usb_device.serial_number))554 if temp.lower() in data.lower():555 guid = reg_value.name()[11:]556 guid = guid[:len(guid)-1]557 usb_device.guid = guid558 write_debug(name='GUID', value=usb_device.guid)559 usb_device.mountpoint = data[4:]560 write_debug(name='Mountpoint', value=usb_device.mountpoint)561def process_device_classes(registry):562 """Processes the CCS \Control\DeviceClasses\{53f56307-b6bf-11d0-94f2-00a0c91efb8b keys"""563 write_debug(data='Method: process_device_classes')564 ccs = []565 root_key = registry.root()566 for k in root_key.subkeys():567 if 'ControlSet' in k.name():568 ccs.append(k.name())569 global usb_devices570 # Get the DeviceClasses related information e.g. VID & PID + USB Date/Time571 for usb_device in usb_devices:572 if len(usb_device.mountpoint.strip()) == 0:573 continue574 for c in ccs:575 key = registry.open(c + '\\Control\\DeviceClasses\\{53f56307-b6bf-11d0-94f2-00a0c91efb8b}')576 for sub_key in key.subkeys():577 if usb_device.mountpoint in sub_key.name():578 usb_device.device_classes_datetime_53f56307b6bf11d094f200a0c91efb8b = sub_key.timestamp()579 write_debug(name='Dev Classes date/time (53f56)',580 value=usb_device.device_classes_datetime_53f56307b6bf11d094f200a0c91efb8b.strftime('%Y-%m-%dT%H:%M:%S'))581 continue582 if usb_device.serial_number in sub_key.name():583 usb_device.device_classes_datetime_53f56307b6bf11d094f200a0c91efb8b = sub_key.timestamp()584 write_debug(name='Dev Classes date/time (53f56)',585 value=usb_device.device_classes_datetime_53f56307b6bf11d094f200a0c91efb8b.strftime('%Y-%m-%dT%H:%M:%S'))586 continue587 key = registry.open(c + '\\Control\\DeviceClasses\\{10497b1b-ba51-44e5-8318-a65c837b6661}')588 for sub_key in key.subkeys():589 if usb_device.mountpoint in sub_key.name():590 usb_device.device_classes_datetime_10497b1bba5144e58318a65c837b6661 = sub_key.timestamp()591 write_debug(name='Dev Classes date/time (10497)',592 value=usb_device.device_classes_datetime_10497b1bba5144e58318a65c837b6661.strftime('%Y-%m-%dT%H:%M:%S'))593 continue594 if usb_device.serial_number in sub_key.name():595 usb_device.device_classes_datetime_10497b1bba5144e58318a65c837b6661 = sub_key.timestamp()596 write_debug(name='Dev Classes date/time (10497)',597 value=usb_device.device_classes_datetime_10497b1bba5144e58318a65c837b6661.strftime('%Y-%m-%dT%H:%M:%S'))598 continue599# Software Hive Methods #######################################################600def process_windows_portable_devices(registry):601 """Processes the Microsoft\Windows Portable Devices\Devices key"""602 write_debug(data='Method: process_windows_portable_devices')603 try:604 key = registry.open('Microsoft\\Windows Portable Devices\\Devices')605 global usb_devices606 # Get the DeviceClasses related information e.g. VID & PID + USB Date/Time607 for sub_key in key.subkeys():608 if not '_##_USBSTOR' in sub_key.name() and not '_??_USBSTOR' in sub_key.name():609 continue610 for usb_device in usb_devices:611 if not usb_device.serial_number in sub_key.name():612 continue613 friendly_name = get_reg_value(sub_key, 'FriendlyName')614 if len(friendly_name.value()) == 0:615 write_debug(data='FriendlyName value not defined')616 continue617 temp = friendly_name.value()618 if '(' in temp:619 usb_device.drive_letter = temp[temp.index('('):]620 usb_device.drive_letter = usb_device.drive_letter.replace('(', '')621 usb_device.drive_letter = usb_device.drive_letter.replace(')', '')622 usb_device.volume_name = temp[0:temp.index('(')]623 write_debug(name='Drive letter', value=usb_device.drive_letter)624 write_debug(name='Volume name', value=usb_device.volume_name)625 elif ':\\' in temp:626 usb_device.drive_letter = temp627 write_debug(name='Drive letter', value=usb_device.drive_letter)628 else:629 usb_device.volume_name = temp630 write_debug(name='Volume name', value=usb_device.volume_name)631 except Registry.RegistryKeyNotFoundException:632 return633def process_emd_mgmt(registry):634 """Processes SOFTWARE\Microsoft\Windows NT\CurrentVersion\EMDMgmt key"""635 write_debug(data='Method: process_emd_mgmt')636 try:637 key = registry.open('Microsoft\\Windows NT\\CurrentVersion\\EMDMgmt')638 global usb_devices639 for sub_key in key.subkeys():640 if not '_##_USBSTOR#Disk&' in sub_key.name() and not '_??_USBSTOR#Disk&' in sub_key.name():641 continue642 volume_serial_no = ''643 volume_name = ''644 write_debug(data=sub_key.name())645 if not '{53f56307-b6bf-11d0-94f2-00a0c91efb8b}' in sub_key.name().lower():646 write_debug(data='Key name does not contain {53f56307-b6bf-11d0-94f2-00a0c91efb8b}')647 continue648 index = sub_key.name().index('#{53f56307-b6bf-11d0-94f2-00a0c91efb8b}')649 mountpoint = sub_key.name()[0:index + 39]650 mountpoint = mountpoint.replace('_??_', '')651 mountpoint = mountpoint.replace('_##_', '')652 write_debug(name='Mountpoint', value=mountpoint)653 if len(mountpoint) == 0:654 continue655 # Now get the string data that is after the GUID656 data = sub_key.name()[index + 39:] # 38 is the length of the '{53f56....' string/GUID657 if '_' in data:658 volume_serial_no = data[data.rfind('_') + 1:]659 volume_name = data[0:data.rfind('_')]660 for usb_device in usb_devices:661 if usb_device.mountpoint.lower() != mountpoint.lower():662 continue663 emdMgmt = EmdMgmt()664 emdMgmt.volume_serial_num = volume_serial_no665 write_debug(name='EMDMgmt serial no.', value=emdMgmt.volume_serial_num)666 emdMgmt.volume_name = volume_name667 write_debug(name='EMDMgmt volume name', value=emdMgmt.volume_name)668 emdMgmt.timestamp = sub_key.timestamp()669 write_debug(name='EMDMgmt date/time', value=emdMgmt.timestamp.strftime('%Y-%m-%dT%H:%M:%S'))670 if len(volume_serial_no) > 0:671 temp_vsn = int(volume_serial_no)672 emdMgmt.volume_serial_num_hex = "%x" % temp_vsn673 write_debug(name='EMDMgmt serial no. (hex)', value=emdMgmt.volume_serial_num_hex)674 usb_device.emdmgmt.append(emdMgmt)675 except Registry.RegistryKeyNotFoundException:676 return677##### NTUSER Hive Methods ############################################################################################678def process_mountpoints2(registry, reg_file_path):679 """Processes the Software\Microsoft\Windows\CurrentVersion\Explorer\MountPoints2 key"""680 try:681 key = registry.open('Software\\Microsoft\\Windows\\CurrentVersion\\Explorer\\MountPoints2')682 global usb_devices683 for usb_device in usb_devices:684 # If there is no GUID then we cannot match so just continue685 if len(usb_device.guid) == 0:686 continue687 for sub_key in key.subkeys():688 if '{' + usb_device.guid + '}' != sub_key.name():689 continue690 mp2 = MountPoint2()691 mp2.file = reg_file_path692 mp2.timestamp = sub_key.timestamp()693 usb_device.mountpoint2.append(mp2)694 write_debug(name='Mountpoint2 file', value=mp2.file)695 write_debug(name='Mountpoint2 date/time', value=mp2.timestamp.strftime('%Y-%m-%dT%H:%M:%S'))696 except Registry.RegistryKeyNotFoundException:697 return698# Log File Methods ############################################################699def process_log_file(file):700 """"""701 regexXp1 = '^\[([0-9]+/[0-9]+/[0-9]+\s[0-9]+:[0-9]+:[0-9]+)\s[0-9]+.[0-9]+\sDriver\sInstall\]'702 regexXp2 = '#-019 .*? ID\(s\): usb\\(.+)'703 regexXp3 = '#I121.*? "(.*)"'704 regexVista1 = '>>> *\[Device Install \(Hardware initiated\) - USBSTOR\\(.+)\]'705 regexVista2 = '>>>\s\sSection\sstart\s([0-9]+/[0-9]+/[0-9]+\s[0-9]+:[0-9]+:[0-9]+\.[0-9]+)'706 regexWin78 = '>>>\s\s\[Device\sInstall\s\(Hardware\sinitiated\) - SWD\\WPDBUSENUM\\_\?\?_USBSTOR#(.*)\]'707 with open(file) as f:708 lines = f.readlines()709 f.close()710 install_times = {}711 index = 0712 while index < len(lines) - 1:713 index += 1714 line = lines[index].strip()715 timestamp = ''716 key = ''717 if os == WindowsVersions.WindowsXP or os == WindowsVersions.WindowsXPx64:718 match = re.search(regexXp1, line, re.I)719 if match:720 timestamp = match.group(1)721 if len(timestamp) == 0:722 continue723 match = re.search(regexXp3, line, re.I)724 if match:725 key = match.group(1)726 if len(key) == 0:727 continue728 if 'USB\\' not in key and 'USBSTOR\\' not in key:729 continue730 if key in install_times is False:731 install_times[key] = timestamp732 elif os == WindowsVersions.WindowsVista:733 if not '>>> [Device Install (Hardware initiated) - USBSTOR' in line:734 continue735 match = re.search(regexVista1, line, re.I)736 if match:737 key = 'USBSTOR\\' + match.group(1)738 if len(key) == 0:739 continue740 # Get the line below741 index += 1742 line = lines[index].strip()743 match = re.search(regexVista2, line, re.I)744 if match:745 timestamp = match.group(1)746 if len(timestamp) == 0:747 continue748 if key not in install_times:749 install_times[key] = timestamp750 else:751 #'>>>\s\s\[Device\sInstall\s\(Hardware\sinitiated\) - SWD\\WPDBUSENUM\\_\?\?_USBSTOR#(.*)\]'752 if not '>>> [Device Install (Hardware initiated) - SWD\\WPDBUSENUM\\_??_USBSTOR#' in line:753 continue754 #'>>>\s\s\[Device\sInstall\s\(Hardware\sinitiated\) - SWD\\WPDBUSENUM\\_\?\?_USBSTOR#(.*)\]'755 match = re.match('>>>\s\s\[Device\sInstall\s\(Hardware\sinitiated\) - SWD\\\\WPDBUSENUM\\\\_\?\?_USBSTOR#(.*)\]', line, re.I)756 if match:757 key = 'USBSTOR\\' + match.group(1)758 if len(key) == 0:759 continue760 # Now turn the top value into the bottom value761 # usbstor\disk&ven_frespons&prod_tactical_subject&rev_0.00#00000022928277&0#{53f56307-b6bf-11d0-94f2-00a0c91efb8b}762 # usbstor\disk&ven_frespons&prod_tactical_subject&rev_0.00\00000022928277&0763 key = key.replace("\\", "#")764 # Get the line below765 index += 1766 line = lines[index].strip()767 match = re.search(regexVista2, line, re.I)768 if match:769 timestamp = match.group(1)770 if len(timestamp) == 0:771 continue772 if key not in install_times:773 install_times[key] = timestamp774 # Now update the install date/time for the devices775 for key, timestamp in install_times.iteritems():776 for device in usb_devices:777 if os == WindowsVersions.WindowsXP or os == WindowsVersions.WindowsXPx64:778 if (device.vid.lower() + '&' + device.pid.lower() + "\\" + device.serial_number.lower()) in key.lower():779 device.install_datetime = datetime.strptime(timestamp, "%Y/%m/%d %H:%M:%S.%f")780 elif (device.vendor.lower() + "&" + device.product.lower() + "&" + device.version.lower() + "\\" + device.serial_number.lower()) in key.lower():781 #I121 "USBSTOR\DISK&VEN_USB_2.0&PROD_&REV_1100\6&12202299&0782 device.install_datetime = datetime.strptime(timestamp, "%Y/%m/%d %H:%M:%S.%f")783 else:784 if len(device.mountpoint) == 0:785 continue786 parts = device.mountpoint.split('#')787 if len(parts) == 4:788 temp = parts[0] + "#" + parts[1] + "#" + parts[2] + '#{53f56307-b6bf-11d0-94f2-00a0c91efb8b}'789 if temp.lower() in key.lower():790 write_debug(data='Matched install log timestamp using mountpoint: ' + key.lower())791 device.install_datetime = datetime.strptime(timestamp, "%Y/%m/%d %H:%M:%S.%f")792 else:793 write_debug(data='Unable to match install log timestamp using mountpoint: ' + key.lower())794 else:795 print('The mountpoint does not contain 4 delimited (#) parts: ' + device.mountpoint)796# Helper Methods ##############################################################797def load_file(file):798 """Loads a file as a registry hive"""799 try:800 print('Loading file: ' + file)801 registry = Registry.Registry(file)802 return registry803 except Exception:804 return None805def parse_windows_timestamp(qword):806 """see http://integriography.wordpress.com/2010/01/16/using-phython-to-parse-and-present-windows-64-bit-timestamps"""807 return datetime.utcfromtimestamp(float(qword) * 1e-7 - 11644473600)808def does_usb_device_exist(usb_device):809 """Iterates through the module level list of USB device objects and determines if the object already exists"""810 global usb_devices811 for device in usb_devices:812 if (device.serial_number == usb_device.serial_number and813 device.vendor == usb_device.vendor and814 device.product == usb_device.product and815 device.version == usb_device.version and816 device.parent_prefix_id == usb_device.parent_prefix_id):817 return True818 return False819def get_usb_device(serial_no):820 """Iterates through the module level list of USB device objects and returns the object if it exists"""821 global usb_devices822 for device in usb_devices:823 if device.serial_number == serial_no:824 return device825 return None826def get_usb_device(**kwargs):827 """Iterates through the module level list of USB device objects and returns the object if it exists"""828 global usb_devices829 for device in usb_devices:830 if len(kwargs) == 1:831 if device.serial_number == kwargs['serial_number']:832 write_debug(data='Located USB device by serial number')833 return device834 elif len(kwargs) == 4:835 write_debug(data=kwargs['serial_number'] + '#' + kwargs['vendor'] + '#' + kwargs['product'] + '#' + kwargs['version'])836 write_debug(data=device.serial_number + '#' + device.vendor + '#' + device.product + '#' + device.version)837 if (device.serial_number == kwargs['serial_number'] and838 device.vendor == kwargs['vendor'] and839 device.product == kwargs['product'] and840 device.version == kwargs['version']):...
test_ntfs.py
Source:test_ntfs.py
1"""2This test checks the target for the presence of a USB device. It checks3whether the device is large enough for the test to execute. It then formats4the USB as NTFS, creates partitions and a device label. The device is then5mounted and a large image is copied over to the USB (virtual size 13G). That6image is then imported into glance and a VM is booted from the image using a7newly created flavour.8"""9from pytest import skip10from utils.tis_log import LOG11from utils.clients.ssh import ControllerClient12from consts.stx import FlavorSpec13from keywords import vm_helper, nova_helper, glance_helper, host_helper, system_helper14from testfixtures.fixture_resources import ResourceCleanup15def locate_usb(host_type="controller", min_size=13):16 """17 Try to locate a USB device on a host of the type specified.18 Arguments:19 - host_type (string) - e.g. controller, compute, storage20 - min_size (int) - minimum size of USB required (GiB)21 Returns:22 - hostname, e.g. controller-023 - usb_device, e.g. /dev/sdb24 """25 LOG.tc_step("Check all hosts of type {} for USB devices".format(host_type))26 hosts = system_helper.get_hosts(personality=host_type)27 for host in hosts:28 with host_helper.ssh_to_host(host) as host_ssh:29 cmd = "ls --color=none -ltrd /dev/disk/by-id/usb*"30 rc, out = host_ssh.exec_cmd(cmd)31 if rc == 0:32 usb_device = "/dev/" + (out.splitlines()[0])[-3:]33 LOG.info("Found USB device {} on host {}".format(usb_device, host))34 cmd = "blockdev --getsize64 {}".format(usb_device)35 usb_bytes = host_ssh.exec_sudo_cmd(cmd)[1]36 gib = int(usb_bytes) / (1024 * 1024 * 1024)37 if gib > min_size:38 LOG.info("Size of USB device is sufficient for test")39 return host, usb_device40 else:41 skip("Size of USB device is insufficient for test")42 return (None, None)43def umount_usb(host_ssh, mount_point="/media/ntfs"):44 """45 Unmount a USB device.46 Arguments:47 - host_ssh - ssh session to host with USB48 - mount_point (string) - e.g. /media/ntfs49 Returns50 - Nothing51 """52 LOG.tc_step("Unmounting {}".format(mount_point))53 cmd = "umount {}".format(mount_point)54 rc = host_ssh.exec_sudo_cmd(cmd)[0]55 assert rc == 0 or rc == 3256 if rc == 0:57 LOG.info("Umount was successful")58 if rc == 32:59 LOG.info("Umount was unsuccessful. Maybe device was already unmounted?")60def wipe_usb(host_ssh, usb_device):61 """62 Wipe a USB device, including all existing partitions.63 Arguments:64 - host_ssh - ssh session to host with USB65 - usb_device (string) - name of usb device66 Returns:67 - nothing68 """69 LOG.tc_step("Wipe the USB completely")70 cmd = "dd if=/dev/zero of={} bs=1k count=2048".format(usb_device)71 rc = host_ssh.exec_sudo_cmd(cmd, fail_ok=False)[0]72 assert rc == 0, "USB wipe failed"73def create_usb_label(host_ssh, usb_device=None, label="msdos"):74 """75 Create a label on a USB device.76 Arguments:77 - host_ssh - ssh session to host with USB78 - usb_device (string) - e.g. /dev/sdb79 - label (string) - e.g. "msdos"80 Returns:81 - Nothing82 """83 LOG.tc_step("Create label and partition table on the USB")84 cmd = "parted {} mklabel {} -s".format(usb_device, label)85 rc = host_ssh.exec_sudo_cmd(cmd, fail_ok=False)[0]86 assert rc == 0, "Label creation failed"87def create_usb_partition(host_ssh, usb_device, startpt="0", endpt="0"):88 """89 Create a partition on a USB device.90 Arguments:91 - host_ssh - ssh session to host with USB92 - usb_device (string) - e.g. /dev/sdb93 - startpt (string) - partition start point, e.g. "0"94 - endpt (string) - partition end point, e.g. "2048"95 Returns:96 - Nothing97 """98 cmd = "parted -a none {} mkpart primary ntfs {} {}".format(usb_device, startpt, endpt)99 rc = host_ssh.exec_sudo_cmd(cmd)[0]100 assert rc == 0, "Primary partition creation failed"101def format_usb(host_ssh, usb_device, partition):102 """103 This formats a particular partition on a usb device.104 Arguments:105 - host_ssh - ssh session to host with USB106 - usb_device (string) - e.g. /dev/sdb107 - partition (string) - e.g. "2" for /dev/sdb2108 Returns:109 - Nothing110 """111 LOG.tc_step("Format device {} as NTFS".format(usb_device + partition))112 cmd = "mkfs.ntfs -f {}{}".format(usb_device, partition)113 rc = host_ssh.exec_sudo_cmd(cmd)[0]114 assert rc == 0, "Failed to format device"115def mount_usb(host_ssh, usb_device=None, partition="2", mount_type="ntfs", mount_point="/media/ntfs"):116 """117 This creates a mount point and then mounts the desired device.118 Arguments:119 - host_ssh - ssh session to host with USB120 - usb_device (string) - e.g. /dev/sdb121 - mount_point (string) - where the usb should be mounted122 Returns:123 - Nothing124 """125 LOG.tc_step("Check if mount point exists")126 cmd = "test -d {}".format(mount_point)127 rc = host_ssh.exec_sudo_cmd(cmd)[0]128 if rc == 1:129 LOG.tc_step("Create mount point")130 cmd = "mkdir -p {}".format(mount_point)131 rc = host_ssh.exec_sudo_cmd(cmd)[0]132 assert rc == 0, "Mount point creation failed"133 LOG.tc_step("Mount ntfs device")134 cmd = "mount -t {} {} {}".format(mount_type, usb_device + partition, mount_point)135 rc = host_ssh.exec_sudo_cmd(cmd)[0]136 assert rc == 0, "Unable to mount device"137# Wendy says just testing one node is enough.138# @mark.parametrize("host_type", ['controller', 'compute', 'storage'])139def test_ntfs(stx_openstack_required, host_type="controller"):140 """141 This test will test NTFS mount and NTFS formatted device creation on a TiS142 system.143 Arguments:144 - host_type (string) - host type to be tested, e.g. controller, compute,145 storage146 Returns:147 - Nothing148 Test Steps:149 1. Check if desired host has USB inserted. If not, skip150 2. Wipe USB151 3. Change label of device152 4. Create partitions on NTFS device153 5. Format partitions154 4. Copy large image to NTFS mount point155 5. Test mount and big file creation on NTFS mounted device156 """157 # Could pass these in through parametrize instead158 mount_type = "ntfs"159 mount_point = "/media/ntfs/"160 guest_os = 'win_2012'161 boot_source = "image"162 host, usb_device = locate_usb(host_type, min_size=13)163 if not host:164 skip("No USB hardware found on {} host type".format(host_type))165 hosts_with_image_backing = host_helper.get_hosts_in_storage_backing(storage_backing='image')166 if len(hosts_with_image_backing) == 0:167 skip("No hosts with image backing present")168 # if the host with the USB is not the active controler, swact controllers169 con_ssh = ControllerClient.get_active_controller()170 active_controller = system_helper.get_active_controller_name(con_ssh)171 if host != active_controller:172 host_helper.swact_host()173 with host_helper.ssh_to_host(host) as host_ssh:174 wipe_usb(host_ssh, usb_device)175 umount_usb(host_ssh, mount_point=mount_point)176 create_usb_label(host_ssh, usb_device, label="msdos")177 create_usb_partition(host_ssh, usb_device, startpt="0", endpt="2048")178 format_usb(host_ssh, usb_device, partition="1")179 create_usb_partition(host_ssh, usb_device, startpt="2049", endpt="100%")180 format_usb(host_ssh, usb_device, partition="2")181 mount_usb(host_ssh, usb_device, partition="2", mount_type=mount_type, mount_point=mount_point)182 LOG.tc_step("Copy the windows guest image to the mount point")183 src_img = glance_helper.scp_guest_image(img_os=guest_os, dest_dir=mount_point, con_ssh=con_ssh)184 LOG.tc_step("Create flavor for windows guest image")185 flv_id = nova_helper.create_flavor(name=guest_os, vcpus=4, ram=8192, storage_backing="local_image",186 guest_os=guest_os)[1]187 nova_helper.set_flavor(flv_id, **{FlavorSpec.CPU_POLICY: "dedicated"})188 ResourceCleanup.add("flavor", flv_id)189 LOG.tc_step("Import image into glance")190 glance_helper.create_image(name=guest_os, source_image_file=src_img, disk_format="qcow2",191 container_format="bare", con_ssh=con_ssh, cleanup="function")192 LOG.tc_step("Boot VM")193 vm_id = vm_helper.boot_vm(name=guest_os, flavor=flv_id, guest_os=guest_os, source=boot_source, cleanup="function")[1]194 LOG.tc_step("Ping vm and ssh to it")195 vm_helper.wait_for_vm_pingable_from_natbox(vm_id)196 with vm_helper.ssh_to_vm_from_natbox(vm_id) as vm_ssh:197 output = vm_ssh.exec_cmd('pwd', fail_ok=False)[1]...
origin-usb-monitor
Source:origin-usb-monitor
1#!/usr/bin/env python2import sys3import os4fullBinPath=os.path.abspath(os.getcwd() + "/" + sys.argv[0])5fullLibPath=os.path.abspath(os.path.dirname(os.path.dirname(fullBinPath))+"/lib")6sys.path.append(fullLibPath)7import time8import random9import calendar10import time11import sys12import origin13if len(sys.argv) > 1:14 configBundle = sys.argv[1]15 origin.configure(configBundle)16else:17 origin.configure("site")18from origin.client import monitoring_record19from origin.client import server20from origin.client import server_connection21from origin import current_time, timestamp22#Create usb interface23from devices import originIO24io = originIO.originIO()25# just a proxy for actually doing something useful26def makeTempMeasurement():27 return random.random()28# something that represents the connection to the server29# might need arguments.... idk30serv = server()31# perhaps print some useful message. Perhaps try to reconnect....32# print "problem establishing connection to server..."33# sys.exit(1)34# This might need to be more complicated, but you get the gist. Keep sending records forever35while True:36 print "sending...."37 usb_device_info = io.get_all_devices_info()38 print("Number of detected devices: ")39 print(len(usb_device_info))40 for usb_device in usb_device_info:41 #print(usb_device)42 brand = usb_device['brand']43 type = usb_device['type']44 serial = usb_device['serial']45 stream_name = usb_device['name']46 if stream_name == '':47 stream_name = brand+'_'+type+'_'+serial48 49 print(stream_name)50 # this should be changed to not reregister everytime51 connection = serv.registerStream(52 stream=stream_name.encode("ascii"),53 records={ 'set_point':'float',54 'sensor':'float',55 'V':'float',56 'I':'float'})57 record = {58 'set_point':float(usb_device['set_point']),59 'sensor' :float(usb_device['sensor']),60 'V' :float(usb_device['V']),61 'I' :float(usb_device['I'])62 }63 #time.sleep(1)64 record['timestamp'] = current_time(origin.config)65 connection.send(**record)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!