Best Python code snippet using hypothesis
analyzer.py
Source:analyzer.py
1#!/usr/bin/env python32import argparse3import datetime4import json5import matplotlib.pyplot as plt6import sys7class UidSnapshot(object):8 def __init__(self, activity):9 self.uid = activity['uid']10 self.foregroundWrittenBytes = activity['foregroundWrittenBytes']11 self.foregroundFsyncCalls = activity['foregroundFsyncCalls']12 self.backgroundFsyncCalls = activity['backgroundFsyncCalls']13 self.backgroundWrittenBytes = activity['backgroundWrittenBytes']14 self.appPackages = activity['appPackages']15 self.runtimeMs = activity['runtimeMs']16 self.totalWrittenBytes = self.foregroundWrittenBytes + self.backgroundWrittenBytes17 self.totalFsyncCalls = self.backgroundFsyncCalls + self.foregroundFsyncCalls18 if self.appPackages is None: self.appPackages = []19class Snapshot(object):20 def __init__(self, activity, uptime):21 self.uptime = uptime22 self.uids = {}23 self.foregroundWrittenBytes = 024 self.foregroundFsyncCalls = 025 self.backgroundFsyncCalls = 026 self.backgroundWrittenBytes = 027 self.totalWrittenBytes = 028 self.totalFsyncCalls = 029 for entry in activity:30 uid = entry['uid']31 snapshot = UidSnapshot(entry)32 self.foregroundWrittenBytes += snapshot.foregroundWrittenBytes33 self.foregroundFsyncCalls += snapshot.foregroundFsyncCalls34 self.backgroundFsyncCalls += snapshot.backgroundFsyncCalls35 self.backgroundWrittenBytes += snapshot.backgroundWrittenBytes36 self.totalWrittenBytes += snapshot.totalWrittenBytes37 self.totalFsyncCalls += snapshot.totalFsyncCalls38 self.uids[uid] = snapshot39class Document(object):40 def __init__(self, f):41 self.snapshots = []42 uptimes = [0, 0]43 for line in f:44 line = json.loads(line)45 if line['type'] != 'snapshot': continue46 activity = line['activity']47 uptime = line['uptime']48 if uptime < uptimes[0]: uptimes[0] = uptime49 if uptime > uptimes[1]: uptimes[1] = uptime50 self.snapshots.append(Snapshot(activity, uptime))51 self.runtime = datetime.timedelta(milliseconds=uptimes[1]-uptimes[0])52def merge(l1, l2):53 s1 = set(l1)54 s2 = set(l2)55 return list(s1 | s2)56thresholds = [57 (1024 * 1024 * 1024 * 1024, "TB"),58 (1024 * 1024 * 1024, "GB"),59 (1024 * 1024, "MB"),60 (1024, "KB"),61 (1, "bytes")62]63def prettyPrintBytes(n):64 for t in thresholds:65 if n >= t[0]:66 return "%.1f %s" % (n / (t[0] + 0.0), t[1])67 return "0 bytes"68# knowledge extracted from android_filesystem_config.h69wellKnownUids = {70 0 : ["linux kernel"],71 1010 : ["wifi"],72 1013 : ["mediaserver"],73 1017 : ["keystore"],74 1019 : ["DRM server"],75 1021 : ["GPS"],76 1023 : ["media storage write access"],77 1036 : ["logd"],78 1040 : ["mediaextractor"],79 1041 : ["audioserver"],80 1046 : ["mediacodec"],81 1047 : ["cameraserver"],82 1053 : ["webview zygote"],83 1054 : ["vehicle hal"],84 1058 : ["tombstoned"],85 1066 : ["statsd"],86 1067 : ["incidentd"],87 9999 : ["nobody"],88}89class UserActivity(object):90 def __init__(self, uid):91 self.uid = uid92 self.snapshots = []93 self.appPackages = wellKnownUids.get(uid, [])94 self.foregroundWrittenBytes = 095 self.foregroundFsyncCalls = 096 self.backgroundFsyncCalls = 097 self.backgroundWrittenBytes = 098 self.totalWrittenBytes = 099 self.totalFsyncCalls = 0100 def addSnapshot(self, snapshot):101 assert snapshot.uid == self.uid102 self.snapshots.append(snapshot)103 self.foregroundWrittenBytes += snapshot.foregroundWrittenBytes104 self.foregroundFsyncCalls += snapshot.foregroundFsyncCalls105 self.backgroundFsyncCalls += snapshot.backgroundFsyncCalls106 self.backgroundWrittenBytes += snapshot.backgroundWrittenBytes107 self.totalWrittenBytes += snapshot.totalWrittenBytes108 self.totalFsyncCalls += snapshot.totalFsyncCalls109 self.appPackages = merge(self.appPackages, snapshot.appPackages)110 def plot(self, foreground=True, background=True, total=True):111 plt.figure()112 plt.title("I/O activity for UID %s" % (self.uid))113 X = range(0,len(self.snapshots))114 minY = 0115 maxY = 0116 if foreground:117 Y = [s.foregroundWrittenBytes for s in self.snapshots]118 if any([y > 0 for y in Y]):119 plt.plot(X, Y, 'b-')120 minY = min(minY, min(Y))121 maxY = max(maxY, max(Y))122 if background:123 Y = [s.backgroundWrittenBytes for s in self.snapshots]124 if any([y > 0 for y in Y]):125 plt.plot(X, Y, 'g-')126 minY = min(minY, min(Y))127 maxY = max(maxY, max(Y))128 if total:129 Y = [s.totalWrittenBytes for s in self.snapshots]130 if any([y > 0 for y in Y]):131 plt.plot(X, Y, 'r-')132 minY = min(minY, min(Y))133 maxY = max(maxY, max(Y))134 i = int((maxY - minY) / 5)135 Yt = list(range(minY, maxY, i))136 Yl = [prettyPrintBytes(y) for y in Yt]137 plt.yticks(Yt, Yl)138 Xt = list(range(0, len(X)))139 plt.xticks(Xt)140class SystemActivity(object):141 def __init__(self):142 self.uids = {}143 self.snapshots = []144 self.foregroundWrittenBytes = 0145 self.foregroundFsyncCalls = 0146 self.backgroundFsyncCalls = 0147 self.backgroundWrittenBytes = 0148 self.totalWrittenBytes = 0149 self.totalFsyncCalls = 0150 def addSnapshot(self, snapshot):151 self.snapshots.append(snapshot)152 self.foregroundWrittenBytes += snapshot.foregroundWrittenBytes153 self.foregroundFsyncCalls += snapshot.foregroundFsyncCalls154 self.backgroundFsyncCalls += snapshot.backgroundFsyncCalls155 self.backgroundWrittenBytes += snapshot.backgroundWrittenBytes156 self.totalWrittenBytes += snapshot.totalWrittenBytes157 self.totalFsyncCalls += snapshot.totalFsyncCalls158 for uid in snapshot.uids:159 if uid not in self.uids: self.uids[uid] = UserActivity(uid)160 self.uids[uid].addSnapshot(snapshot.uids[uid])161 def loadDocument(self, doc):162 for snapshot in doc.snapshots:163 self.addSnapshot(snapshot)164 def sorted(self, f):165 return sorted(self.uids.values(), key=f, reverse=True)166 def pie(self):167 plt.figure()168 plt.title("Total disk writes per UID")169 A = [(K, self.uids[K].totalWrittenBytes) for K in self.uids]170 A = filter(lambda i: i[1] > 0, A)171 A = list(sorted(A, key=lambda i: i[1], reverse=True))172 X = [i[1] for i in A]173 L = [i[0] for i in A]174 plt.pie(X, labels=L, counterclock=False, startangle=90)175parser = argparse.ArgumentParser("Process FlashApp logs into reports")176parser.add_argument("filename")177parser.add_argument("--reportuid", action="append", default=[])178parser.add_argument("--plotuid", action="append", default=[])179parser.add_argument("--totalpie", action="store_true", default=False)180args = parser.parse_args()181class UidFilter(object):182 def __call__(self, uid):183 return False184class UidFilterAcceptAll(UidFilter):185 def __call__(self, uid):186 return True187class UidFilterAcceptSome(UidFilter):188 def __init__(self, uids):189 self.uids = uids190 def __call__(self, uid):191 return uid in self.uids192uidset = set()193plotfilter = None194for uid in args.plotuid:195 if uid == "all":196 plotfilter = UidFilterAcceptAll()197 break198 else:199 uidset.add(int(uid))200if plotfilter is None: plotfilter = UidFilterAcceptSome(uidset)201uidset = set()202reportfilter = None203for uid in args.reportuid:204 if uid == "all":205 reportfilter = UidFilterAcceptAll()206 break207 else:208 uidset.add(int(uid))209if reportfilter is None:210 if len(uidset) == 0:211 reportfilter = UidFilterAcceptAll()212 else:213 reportfilter = UidFilterAcceptSome(uidset)214document = Document(open(args.filename))215print("System runtime: %s\n" % (document.runtime))216system = SystemActivity()217system.loadDocument(document)218print("Total bytes written: %s (of which %s in foreground and %s in background)\n" % (219 prettyPrintBytes(system.totalWrittenBytes),220 prettyPrintBytes(system.foregroundWrittenBytes),221 prettyPrintBytes(system.backgroundWrittenBytes)))222writemost = filter(lambda ua: ua.totalWrittenBytes > 0, system.sorted(lambda ua: ua.totalWrittenBytes))223for entry in writemost:224 if reportfilter(entry.uid):225 print("user id %d (%s) wrote %s (of which %s in foreground and %s in background)" % (226 entry.uid,227 ','.join(entry.appPackages),228 prettyPrintBytes(entry.totalWrittenBytes),229 prettyPrintBytes(entry.foregroundWrittenBytes),230 prettyPrintBytes(entry.backgroundWrittenBytes)))231 if plotfilter(entry.uid):232 entry.plot()233 plt.show()234if args.totalpie:235 system.pie()...
compress.py
Source:compress.py
1#########################################################################2# Compresses LZSS data for PS1 game "Racing Lagoon" into bytes format #3#########################################################################45# Racing Lagoon uses references with 11-bit offsets and 4-bit lengths,6# corresponding to a 2048-byte window, and reference lengths in the range7# 3..18 bytes. With a 1 bit identifier for references.89import sys 10import os11from pathlib import Path1213WSIZE = 0b100000000000 # window size1415MAX_REF_LEN = 18 # maximum copy length16MIN_REF_LEN = 3 # minimum copy length1718REPEATER_ID = 0x7E19DOUBLE_REPEATER_ID = 0x7F2021LENGTH_MASK = 0b0111100022OFFSET_MASK_1 = 0b0000011123REFERENCE_MASK_DOUBLE = 0b1000000000000000242526#Compresses the given file and returns a bytes string of the compressed file27def compressChunk(inputData):28 29 writeBuffer = b''3031 #write header - [4 BYTE LITTLE ENDIAN UNCOMPRESSED FILESIZE] 32 # [0x0173] 33 fileSize = len(inputData)34 35 writeBuffer += bytes([ fileSize & 0xFF])36 writeBuffer += bytes([(fileSize & 0xFF00)>>8])37 writeBuffer += bytes([(fileSize & 0xFF0000)>>16])38 writeBuffer += bytes([(fileSize & 0xFF000000)>>24])39 writeBuffer += b'\x01\x73'4041 #write compressed blocks until file size reached42 43 bytesWritten = 044 windowLeft = 045 windowRight = 04647 inLiteralBlock = False48 literalBlockSize = 04950 while bytesWritten < fileSize:51 referenceFound = False52 53 #######Test for reference54 #check all valid upcoming string lengths for matches in our dictionary window55 if referenceFound == False:56 for copyLength in range(MAX_REF_LEN, MIN_REF_LEN - 1, -1):57 58 #Check if search window extends past EOF59 if bytesWritten + copyLength >= fileSize:60 continue61 else:62 windowRight = bytesWritten + copyLength6364 searchString = inputData[bytesWritten:windowRight]6566 #Search valid window for result67 searchResult = inputData.find(searchString, windowLeft, windowRight - 1)68 if searchResult != -1:69 70 #Truncate and append literal block chain, if exists71 if inLiteralBlock == True:72 writeBuffer += bytes([literalBlockSize])73 writeBuffer += inputData[bytesWritten - literalBlockSize: bytesWritten]74 75 inLiteralBlock = False76 literalBlockSize = 0777879 #create reference block with pattern 0b1YYYYZZZZZZZZZZZ (Y is length-3, Z is reference offset-1)80 lengthNibble = (copyLength-3) << 1181 offsetNibble = bytesWritten - searchResult - 182 83 referenceBlock = REFERENCE_MASK_DOUBLE | lengthNibble | offsetNibble8485 writeBuffer += bytes([(referenceBlock & 0xFF00)>>8]) + bytes([referenceBlock & 0xFF])86 referenceFound = True8788 #89 bytesWritten += copyLength90 91 break92 93 #######Test for repeated chars of length94 if referenceFound == False:9596 #If at least next 4 bytes are identical, write repeater block97 if bytesWritten + 4 < fileSize and inputData[bytesWritten] == inputData[bytesWritten+1] == inputData[bytesWritten + 2] == inputData[bytesWritten+3]:98 #Truncate and append literal block chain, if exists99 if inLiteralBlock == True:100 writeBuffer += bytes([literalBlockSize])101 writeBuffer += inputData[bytesWritten - literalBlockSize: bytesWritten]102 103 inLiteralBlock = False104 literalBlockSize = 0105106 count = 4107108 while len(inputData) > (bytesWritten + count) and inputData[bytesWritten + count] == inputData[bytesWritten + count - 1]:109 count+=1110111 #Max length = 0xFFFF112 count = count % 0xFFFF113 114 #Double repeater block used if count is greater than 0xFF+0x04 115 if count <=0x103:116 writeBuffer += bytes([REPEATER_ID])117 writeBuffer += bytes([count-4])118 else:119 writeBuffer += bytes([DOUBLE_REPEATER_ID])120 writeBuffer += count.to_bytes(2, byteorder="little")121 122 writeBuffer += bytes([inputData[bytesWritten]])123124 referenceFound = True125 bytesWritten += count126127 128 129 #######write literal block130 if referenceFound == False:131 132 literalBlockSize += 1133 bytesWritten +=1134 inLiteralBlock = True135136 #If max literal block size reached, truncate and write block137 if literalBlockSize == REPEATER_ID - 1:138 writeBuffer += bytes([literalBlockSize])139 writeBuffer += inputData[bytesWritten - literalBlockSize: bytesWritten]140 141 inLiteralBlock = False142 literalBlockSize = 0143 144145 #update window146 if bytesWritten > WSIZE:147 windowLeft = bytesWritten - WSIZE148149 #Truncate and append literal block chain, if exists150 if inLiteralBlock == True:151 writeBuffer += bytes([literalBlockSize])152 writeBuffer += inputData[bytesWritten - literalBlockSize: bytesWritten]153154 return writeBuffer155156157158#inputfile = open("LZSSOUTRight.bin", "rb")159160#data = compressChunk(inputfile.read())161162#outputFile = open("LZSSTestOut.bin", "wb")163164#outputFile.write(data)165166#outputFile.close()
...
test_lammpsdata_write.py
Source:test_lammpsdata_write.py
1"""2Create an atoms object and write it to a lammps data file3"""4from io import StringIO5import ase.io6from .parse_lammps_data_file import lammpsdata_file_extracted_sections7from .comparison import compare_with_pytest_approx8# Relative tolerance for comparing floats with pytest.approx9REL_TOL = 1e-210def test_lammpsdata_write(atoms):11 # Write atoms object to lammpsdata file-like object12 lammpsdata_buf = StringIO()13 ase.io.write(14 lammpsdata_buf, atoms, format="lammps-data", atom_style="full", velocities=True15 )16 # Now read the output back, parse it, and compare to the original atoms17 # object attributes18 written_values = lammpsdata_file_extracted_sections(lammpsdata_buf)19 # Check cell was written correctly20 cell_written = written_values["cell"]21 cell_expected = atoms.get_cell()22 compare_with_pytest_approx(cell_written, cell_expected, REL_TOL)23 # Check that positions were written correctly24 positions_written = written_values["positions"]25 positions_expected = atoms.get_positions(wrap=True)26 compare_with_pytest_approx(positions_written, positions_expected, REL_TOL)27 # Check velocities were read in correctly28 velocities_written = written_values["velocities"]29 velocities_expected = atoms.get_velocities()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!