Best Python code snippet using fMBT_python
pcapconverter.py
Source:pcapconverter.py
1"""2pcapconverter.py: converts packets from a pcap file into a list of PacketData objects3representing that file.4pcapConverter takes a filename (with extension) and calls editcapSplit, which uses editcap5(a command provided by Wireshark) to split the file into smaller files of 10000 packets each.6It then uses a pool of processes to run the extractData method on each of them concurrently.7extractData calls the Scapy library method PcapReader to obtain information about each packet and8convert it into a list of PacketData objects to be provided to packetdataConverter. The data from9each smaller file is compiled together and sorted, then returned.10Author: Dean Quaife11Last edited: 2020/05/2112"""13from scapy.all import PcapReader14from hyprfire_app.analysis.packetdata import PacketData15from hyprfire_app.exceptions import ConverterException, EditcapException16import datetime, subprocess, multiprocessing, logging, pathvalidate17from pathlib import Path18from decimal import Decimal19""" Retrieves data from a pcap file20Will raise a ConverterException if filename doesn't exist or isn't a pcap file21Input parameters: filename: a string representing the path to a pcap file22Output: data_list: a List of PacketData objects representing that pcap file """23def pcapConverter(filename):24 cores = multiprocessing.cpu_count()25 temp_paths = []26 data_list = []27 logger = logging.getLogger(__name__)28 fileCount = 029 logger.info(f"PcapConverter start: filename is {filename}")30 if not Path(filename).is_file():31 raise ConverterException("Filename does not exist")32 logger.debug("Attempting to split file using editcap...")33 try:34 editcapSplit(filename)35 except EditcapException:36 raise ConverterException("Editcap failure: is this file a capture file?")37 for x in Path('.').glob('temp*.pcap'):38 temp_paths.append(str(x))39 fileCount += 140 logger.debug(f"Success! Split pcap file into {fileCount} temp files. Extracting pcap data...")41 with multiprocessing.Pool(processes=cores)as pool:42 res = pool.map(extractData, temp_paths) #pass temp filenames to extractData with multiprocessing43 for segment in sorted(res, key= lambda temp_data: temp_data[0].epochTimestamp):44 data_list += segment #sort the segments to ensure they weren't received out of order45 logger.debug(f"Obtained data for {len(data_list)} packets. Cleaning up temp files...")46 for x in Path('.').glob('temp*.pcap'):47 Path(x).unlink() #delete the temporary smaller files48 logger.info("PcapConverter finished. Returning PacketData list.")49 return data_list50""" uses the editcap command provided by Wireshark to split filename into 10000 packet pcap files51Will raise an EditcapException if filename is not a pcap file52Input parameters: filename: a string representing a pcap filename """53def editcapSplit(filename):54 temp = "temp.pcap"55 command = ['editcap', '-c', '10000', filename, temp]56 res = subprocess.run(command) # split filename with editcap57 if res.returncode != 0:58 raise EditcapException()59""" converts important data from each editcap file into a list of PacketData objects60Input parameters: file: a string representing a pcap filename61Output: packets: a PacketData List representing file """62def extractData(file):63 packets = []64 with PcapReader(file) as reader:65 for packet in reader:66 epochTimestamp = Decimal(str(packet.time))67 timestamp = sinceMidnight(epochTimestamp)68 len = packet.wirelen69 packets.append(PacketData(epochTimestamp, timestamp, len))70 return packets71""" returns number of microseconds since midnight on the day the packet arrived72used to calculate graph values later73Input parameters: epochTimestamp: a Decimal value retrieved from a packet timestamp74Output: microseconds: an int representing the number of seconds since midnight on the date of75epochTimestamp """76def sinceMidnight(epochTimestamp):77 date = datetime.datetime.fromtimestamp(epochTimestamp) #only returns time up to seconds78 seconds_midnight = datetime.datetime(date.year, date.month, date.day).timestamp()79 seconds = epochTimestamp - Decimal(seconds_midnight)80 microseconds = 1000000 * seconds...
log.py
Source:log.py
1'''2'''3import datetime4from com.nwrobel import mypycommons5import com.nwrobel.mypycommons.file6import com.nwrobel.mypycommons.time7class MPDLogLine:8 '''9 Class representing a single log line entry from an MPD log file.10 Params:11 text: what the log actually says (log line minus the string timestamp info)12 timestamp: time for when this log line occurred/was written, as an epoch timestamp (float or int)13 originalText: the full line text as originally read from the file14 '''15 def __init__(self, text, epochTimestamp, originalText):16 # perform validation on input data17 if (not text) or (not isinstance(text, str)):18 raise TypeError("Class attribute 'text' must be a non-empty string")19 if (not mypycommons.time.isValidTimestamp(epochTimestamp)):20 raise ValueError("Class attribute 'timestamp' must be a valid timestamp, value '{}' is invalid".format(timestamp))21 self.text = text22 self.epochTimestamp = epochTimestamp23 self.originalText = originalText24class MPDLog:25 '''26 '''27 def __init__(self, filepath):28 if (not filepath) or (not isinstance(filepath, str)):29 raise TypeError("Class attribute 'filepath' must be a non-empty string")30 # validate that the filepath exists31 if (not mypycommons.file.isFile(filepath)):32 raise ValueError("Class attribute 'filepath' must be a valid filepath to an existing file: invalid value '{}'".format(logFilepath))33 self.filepath = filepath34 self.lines = self._readLogLines()35 def reset(self, mpdLogLineToPreserve):36 '''37 Save given last log line by default38 '''39 currentLines = self._getRawLines()40 previousLines = [logLine.originalText for logLine in self.lines]41 newLines = [currentLine for currentLine in currentLines if (currentLine not in previousLines)]42 mypycommons.file.clearFileContents(self.filepath)43 # Insert the line to preserve at the beginning of the list of new lines (if there is one), 44 # since the line to preserve is older than the newer lines45 if (mpdLogLineToPreserve):46 newLines.insert(0, mpdLogLineToPreserve.originalText) 47 mypycommons.file.writeToFile(self.filepath, newLines)48 def _readLogLines(self):49 '''50 Collects all log lines (as MPDLogLine objects) from the MPD log file.51 The list is then sorted by the timestamp of each line (earliest first) and returned.52 '''53 rawLogFileLines = self._getRawLines()54 allMPDLogLines = []55 for logLine in rawLogFileLines:56 # Try using the start year first57 lineTimestamp = self._getTimestampFromRawLogLine(logLine)58 lineText = self._getTextFromRawLogLine(logLine)59 allMPDLogLines.append(MPDLogLine(epochTimestamp=lineTimestamp, text=lineText, originalText=logLine))60 # Sort the loglines array61 sortedLines = self._sortMPDLogLinesByTimestamp(allMPDLogLines)62 return sortedLines63 def _getRawLines(self):64 rawLogFileLines = mypycommons.file.readFile(self.filepath)65 rawLogFileLines = [logLine.replace('\n', '') for logLine in rawLogFileLines]66 return rawLogFileLines67 def _getTimestampFromRawLogLine(self, logLine):68 '''69 Gets epoch timestamp from the raw log line of a log file - for the new, MPD logfiles that use syslog70 and timestamps have the year71 '''72 lineTimeData = logLine[0:26]73 lineDatetime = datetime.datetime.strptime(lineTimeData, "%Y-%m-%dT%H:%M:%S.%f")74 epochTimestamp = lineDatetime.timestamp()75 return epochTimestamp76 def _getTextFromRawLogLine(self, logLine):77 lineText = logLine[33:]78 return lineText79 def _sortMPDLogLinesByTimestamp(self, mpdLogLines):80 mpdLogLines.sort(key=lambda line: line.epochTimestamp)...
zipCreator.py
Source:zipCreator.py
1import time2from Modules import timeformat as tf, zipmodule as zm, osoprations as osop,logmodule as lm3from Constant import ZipperApp as zac4logger = lm.getlogger()5while(True):6 textfilename=osop.fileexists(zac.textfilepath)7 if not (textfilename=="Only 1 .txt allowed" or textfilename=="no .txt found we'll keep checking"):8 logger.info(textfilename+" found")9 epochtimestamp=tf.getepoch()10 zipname=tf.getformattedtime(epochtimestamp,zac.timestringformat)11 if (zm.createzip(epochtimestamp,zipname,zac.textfilepath,textfilename,osop.dircreateexists(zac.zipdestinationfolder))):12 logger.info(zipname + " created")13 osop.delfile(zac.textfilepath + textfilename)14 else:15 logger.critical("zip creation failed")16 else:17 logger.info(textfilename)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!