Best Python code snippet using fMBT_python
fmbtgti.py
Source:fmbtgti.py
...1158 _fmbtLog('creating directory "%s" for screenshots failed: %s' %1159 (necessaryDirs, e))1160 raise1161 return filepath1162 def _archiveScreenshot(self, filepath):1163 if self._screenshotArchiveMethod == "remove":1164 try:1165 os.remove(filepath)1166 except IOError:1167 pass1168 elif self._screenshotArchiveMethod.startswith("resize"):1169 if self._screenshotArchiveMethod == "resize":1170 convertArgs = ["-resize",1171 "%sx" % (int(self.screenSize()[0]) / 4,)]1172 else:1173 widthHeight = self._screenshotArchiveMethod.split()[1]1174 convertArgs = ["-resize", widthHeight]1175 subprocess.call(["convert", filepath] + convertArgs + [filepath])1176 def _archiveScreenshots(self):1177 """1178 Archive screenshot files if screenshotLimit has been exceeded.1179 """1180 freeScreenshots = [filename1181 for (filename, refCount) in self._screenshotRefCount.iteritems()1182 if refCount == 0]1183 archiveCount = len(freeScreenshots) - self._screenshotLimit1184 if archiveCount > 0:1185 freeScreenshots.sort(reverse=True) # archive oldest1186 while archiveCount > 0:1187 toBeArchived = freeScreenshots.pop()1188 try:1189 self._archiveScreenshot(toBeArchived)1190 except IOError:1191 pass1192 del self._screenshotRefCount[toBeArchived]1193 archiveCount -= 11194 def refreshScreenshot(self, forcedScreenshot=None, rotate=None):1195 """1196 Takes new screenshot and updates the latest screenshot object.1197 Parameters:1198 forcedScreenshot (Screenshot or string, optional):1199 use given screenshot object or image file, do not1200 take new screenshot.1201 rotate (integer, optional):1202 rotate screenshot by given number of degrees. This1203 overrides constructor rotateScreenshot parameter...
server.py
Source:server.py
1#! /usr/bin/env python32# TEACHER - SERVER #3#4# Copyright (C) 2017 Thomas Michael Weissel5#6# This software may be modified and distributed under the terms7# of the GPLv3 license. See the LICENSE file for details.8#9# sudo -H pip3 install twisted # we need twisted for python310import os11import sys12sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # add application root to python path for imports13import qt5reactor14import ipaddress15import datetime16import time17import sip18import zipfile19import ntpath20import shutil21from twisted.internet import protocol22from twisted.internet.protocol import DatagramProtocol23from twisted.protocols import basic24from twisted.internet.task import LoopingCall25from config.config import *26from applist import *27from config.enums import *28import classes.mutual_functions as mutual_functions29import classes.system_commander as system_commander30from classes.server2client import *31from PyQt5 import uic, QtWidgets, QtCore32from PyQt5.QtGui import *33from PyQt5.QtCore import QRegExp34class ServerUI(QtWidgets.QDialog):35 def __init__(self, factory):36 QtWidgets.QDialog.__init__(self)37 self.factory = factory # type: MyServerFactory38 39 uifile=os.path.join(APP_DIRECTORY,'server/server.ui')40 winicon=os.path.join(APP_DIRECTORY,'pixmaps/windowicon.png')41 self.ui = uic.loadUi(uifile) # load UI42 self.ui.setWindowIcon(QIcon(winicon))# definiere icon für taskleiste43 44 self.ui.exit.clicked.connect(self._onAbbrechen) # setup Slots45 self.ui.sendfile.clicked.connect(lambda: self._onSendFile("all")) # button x (lambda is not needed - only if you wanna pass a variable to the function)46 self.ui.showip.clicked.connect(self._onShowIP) # button y47 self.ui.abgabe.clicked.connect(lambda: self._onAbgabe("all"))48 self.ui.screenshots.clicked.connect(lambda: self._onScreenshots("all"))49 self.ui.startexam.clicked.connect(lambda: self._on_start_exam("all"))50 self.ui.openshare.clicked.connect(self._onOpenshare)51 self.ui.starthotspot.clicked.connect(self._onStartHotspot)52 self.ui.testfirewall.clicked.connect(self._onTestFirewall)53 54 self.ui.autoabgabe.clicked.connect(self._onAutoabgabe)55 self.ui.screenlock.clicked.connect(lambda: self._onScreenlock("all"))56 self.ui.exitexam.clicked.connect(lambda: self._on_exit_exam("all"))57 self.ui.closeEvent = self.closeEvent # links the window close event to our custom ui58 self.ui.printconf.clicked.connect(self._onPrintconf)59 self.ui.printer.clicked.connect(lambda: self._onSendPrintconf("all"))60 self.workinganimation = QMovie("pixmaps/working.gif", QtCore.QByteArray(), self)61 self.workinganimation.setCacheMode(QMovie.CacheAll)62 self.workinganimation.setSpeed(100)63 self.ui.working.setMovie(self.workinganimation)64 self.timer = False65 self.msg = False66 self.ui.version.setText("<b>Version</b> %s" % VERSION )67 self.ui.currentpin.setText("<b>%s</b>" % self.factory.pincode )68 self.ui.examlabeledit1.setText(self.factory.examid )69 self.ui.currentlabel.setText("<b>%s</b>" % self.factory.examid )70 self.ui.examlabeledit1.textChanged.connect(self._updateExamName)71 self.ui.ssintervall.valueChanged.connect(self._changeAutoscreenshot)72 num_regex=QRegExp("[0-9_]+")73 num_validator = QRegExpValidator(num_regex)74 ip_regex=QRegExp("[0-9\._]+")75 ip_validator = QRegExpValidator(ip_regex)76 self.ui.firewall1.setValidator(ip_validator)77 self.ui.firewall2.setValidator(ip_validator)78 self.ui.firewall3.setValidator(ip_validator)79 self.ui.firewall4.setValidator(ip_validator)80 self.ui.port1.setValidator(num_validator)81 self.ui.port2.setValidator(num_validator)82 self.ui.port3.setValidator(num_validator)83 self.ui.port4.setValidator(num_validator)84 findApps(self.ui.applist, self.ui.appview)85 86 self.ui.keyPressEvent = self.newOnkeyPressEvent87 88 self.ui.show()89 def _changeAutoscreenshot(self):90 self._workingIndicator(True, 200)91 intervall = self.ui.ssintervall.value()92 93 if self.factory.lcs.running:94 self.factory.lcs.stop()95 if intervall != 0:96 self.log("<b>Changed Screenshot Intervall to %s seconds </b>" % (str(intervall)))97 self.factory.lcs.start(intervall)98 else:99 self.log("<b>Screenshot Intervall is set to 0 - Screenshotupdate deactivated</b>")100 def _onSendPrintconf(self,who):101 """send the printer configuration to all clients"""102 self._workingIndicator(True, 500)103 server_to_client = self.factory.server_to_client104 if self.factory.rawmode == True: #check if server is already in rawmode (ongoing filetransfer)105 self.log("waiting for ongoing filetransfers to finish ..")106 return107 else:108 if not server_to_client.clients: #check if there are clients connected109 self.log("no clients connected")110 return111 self.factory.rawmode = True; #ready for filetransfer - LOCK all other fileoperations 112 113 self._workingIndicator(True, 4000)114 self.log('<b>Sending Printer Configuration to All Clients </b>')115 system_commander.dialog_popup('Sending Printer Configuration to All Clients')116 # create zip file of /etc/cups117 target_folder = PRINTERCONFIG_DIRECTORY118 filename = "PRINTERCONFIG"119 output_filename = os.path.join(SERVERZIP_DIRECTORY, filename)120 shutil.make_archive(output_filename, 'zip', target_folder)121 filename = "%s.zip" % (filename)122 file_path = os.path.join(SERVERZIP_DIRECTORY, filename) # now with .zip extension123 # regenerate filelist and check for zip file124 self.factory.files = mutual_functions.get_file_list(self.factory.files_path)125 if filename not in self.factory.files:126 self.log('filename not found in directory')127 return128 self.log('Sending Configuration: %s (%d KB)' % (filename, self.factory.files[filename][1] / 1024))129 # send line and file to all clients130 server_to_client.send_file(file_path, who, DataType.PRINTER.value)131 def _onPrintconf(selfs):132 command = "kcmshell5 kcm_printer_manager &"133 os.system(command)134 def _onScreenlock(self,who):135 """locks the client screens"""136 self._workingIndicator(True, 1000)137 138 if self.factory.clientslocked:139 self.log("<b>UNLocking Client Screens </b>")140 os.path.join(APP_DIRECTORY,'pixmaps/network-wired-symbolic.png')141 self.ui.screenlock.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/network-wired-symbolic.png')))142 self.factory.clientslocked = False143 144 if self.factory.rawmode == True: #dirty hack - thx to nora - gives us at least one option to open filetransfers again if something wicked happens145 self.factory.rawmode = False;146 147 if not self.factory.server_to_client.unlock_screens(who):148 self.log("no clients connected")149 else:150 self.log("<b>Locking Client Screens </b>")151 self.ui.screenlock.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/unlock.png')))152 self.factory.clientslocked = True153 if not self.factory.server_to_client.lock_screens(who):154 self.log("no clients connected")155 self.factory.clientslocked = False156 self.ui.screenlock.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/network-wired-symbolic.png')))157 self._onScreenshots("all") #update screenshots right after un/lock158 def _onOpenshare(self):159 startcommand = "runuser -u %s /usr/bin/dolphin %s &" %(USER ,SHARE_DIRECTORY)160 os.system(startcommand)161 def _updateExamName(self):162 self.factory.examid = self.ui.examlabeledit1.text()163 self.ui.currentlabel.setText("<b>%s</b>" % self.factory.examid )164 def _workingIndicator(self, action, duration):165 if self.timer and self.timer.isActive: # indicator is shown a second time - stop old kill-timer166 self.timer.stop()167 if action is True: # show working animation and start killtimer168 self.workinganimation.start()169 self.ui.working.show()170 self.timer = QtCore.QTimer()171 self.timer.timeout.connect(lambda: self._workingIndicator(False, 0))172 self.timer.start(duration)173 else:174 self.workinganimation.stop()175 self.ui.working.hide()176 def _onSendFile(self, who):177 """send a file to all clients"""178 self._workingIndicator(True, 500)179 server_to_client = self.factory.server_to_client180 181 if self.factory.rawmode == True: #check if server is already in rawmode (ongoing filetransfer)182 self.log("waiting for ongoing filetransfers to finish ..")183 return184 else:185 if not server_to_client.clients: #check if there are clients connected186 self.log("no clients connected")187 return188 self.factory.rawmode = True; #ready for filetransfer - LOCK all other fileoperations 189 file_path = self._showFilePicker(SHARE_DIRECTORY)190 if file_path:191 self._workingIndicator(True, 2000) # TODO: change working indicator to choose its own time depending on actions requiring all clients or only one client192 success, filename, file_size, who = server_to_client.send_file(file_path, who, DataType.FILE.value)193 if success:194 self.log('<b>Sending file:</b> %s (%d Byte) to <b> %s </b>' % (filename, file_size, who))195 else:196 self.log('<b>Sending file:</b> Something went wrong sending file %s (%d KB) to <b> %s </b>' % (filename, file_size / 1024, who))197 else:198 self.factory.rawmode = False;199 def _showFilePicker(self, directory):200 # show filepicker201 filedialog = QtWidgets.QFileDialog()202 filedialog.setDirectory(directory) # set default directory203 file_path = filedialog.getOpenFileName() # get filename204 file_path = file_path[0]205 return file_path206 def _onScreenshots(self, who):207 self.log("<b>Requesting Screenshot Update </b>")208 self._workingIndicator(True, 1000)209 210 if self.factory.rawmode == True:211 self.log("waiting for ongoing filetransfers to finish ..")212 return213 else:214 self.factory.rawmode = True; #LOCK all other fileoperations 215 216 if not self.factory.server_to_client.request_screenshots(who):217 self.factory.rawmode = False; # UNLOCK all fileoperations 218 self.log("no clients connected")219 def _onShowIP(self):220 self._workingIndicator(True, 500)221 system_commander.show_ip()222 def _onAbgabe(self, who):223 """get SHARE folder"""224 self._workingIndicator(True, 500)225 self.log('<b>Requesting Client Folder SHARE </b>')226 itime = 2000 if who == 'all' else 1000227 self._workingIndicator(True, itime)228 if self.factory.rawmode == True:229 self.log("waiting for ongoing filetransfers to finish ..")230 return231 else:232 self.factory.rawmode = True; #LOCK all other fileoperations 233 if not self.factory.server_to_client.request_abgabe(who):234 self.factory.rawmode = False; # UNLOCK all fileoperations 235 self.log("no clients connected")236 def _on_start_exam(self, who):237 """238 ZIP examconfig folder239 send configuration-zip to clients - unzip there240 invoke startexam.sh file on clients241 """242 self._workingIndicator(True, 500)243 server_to_client = self.factory.server_to_client244 245 if self.factory.rawmode == True: #check if server is already in rawmode (ongoing filetransfer)246 self.log("waiting for ongoing filetransfers to finish ..")247 return248 else:249 if not server_to_client.clients: #check if there are clients connected250 self.log("no clients connected")251 return252 self.factory.rawmode = True; #ready for filetransfer - LOCK all other fileoperations 253 254 self._workingIndicator(True, 4000)255 self.log('<b>Initializing Exam Mode On All Clients </b>')256 cleanup_abgabe = self.ui.cleanabgabe.checkState()257 # create zip file of all examconfigs258 target_folder = EXAMCONFIG_DIRECTORY259 filename = "EXAMCONFIG"260 output_filename = os.path.join(SERVERZIP_DIRECTORY, filename)261 shutil.make_archive(output_filename, 'zip', target_folder)262 filename = "%s.zip" % (filename)263 file_path = os.path.join(SERVERZIP_DIRECTORY, filename) #now with .zip extension264 #regenerate filelist and check for zip file265 self.factory.files = mutual_functions.get_file_list(self.factory.files_path)266 if filename not in self.factory.files:267 self.log('filename not found in directory')268 return269 self.log('Sending Configuration: %s (%d KB)' % (filename, self.factory.files[filename][1] / 1024))270 # send line and file to all clients271 server_to_client.send_file(file_path, who, DataType.EXAM.value, cleanup_abgabe )272 def _on_exit_exam(self,who):273 self.log("<b>Finishing Exam </b>")274 self._workingIndicator(True, 2000)275 if self.factory.lcs.running:276 self.factory.lcs.stop() # disable autoscreenshot ??277 if self.factory.lc.running: # disable autoabgabe ??278 self.ui.autoabgabe.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/chronometer-off.png')))279 self.factory.lc.stop()280 281 onexit_cleanup_abgabe = self.ui.exitcleanabgabe.checkState()282 283 # first fetch abgabe284 if self.factory.rawmode == True:285 self.log("waiting for ongoing filetransfers to finish ..")286 return #FIXME this could lead to some clients not exiting in very very rare cases where a BIG filetransfer is still on -- probably wait a second and try again ??287 else:288 self.factory.rawmode = True; #LOCK all other fileoperations 289 if not self.factory.server_to_client.request_abgabe(who):290 self.factory.rawmode = False; # UNLOCK all fileoperations 291 self.log("no clients connected")292 # then send the exam exit signal293 if not self.factory.server_to_client.exit_exam(who, onexit_cleanup_abgabe):294 self.log("no clients connected")295 def _onStartHotspot(self):296 self._workingIndicator(True, 500)297 system_commander.start_hotspot()298 def get_firewall_adress_list(self):299 return [[self.ui.firewall1,self.ui.port1],[self.ui.firewall2,self.ui.port2],[self.ui.firewall3,self.ui.port3],[self.ui.firewall4,self.ui.port4]]300 def _onTestFirewall(self):301 self._workingIndicator(True, 1000)302 ipfields = self.get_firewall_adress_list()303 if self.ui.testfirewall.text() == "Stoppe Firewall": #really don't know why qt sometimes adds these & signs to the ui304 system_commander.dialog_popup('Die Firewall wird gestoppt!')305 scriptfile = os.path.join(SCRIPTS_DIRECTORY, "exam-firewall.sh")306 startcommand = "exec %s stop &" % (scriptfile)307 os.system(startcommand)308 self.ui.testfirewall.setText("Firewall testen")309 for i in ipfields:310 palettedefault = i[0].palette()311 palettedefault.setColor(QPalette.Active, QPalette.Base, QColor(255, 255, 255))312 i[0].setPalette(palettedefault)313 elif self.ui.testfirewall.text() == "Firewall testen":314 ipstore = os.path.join(EXAMCONFIG_DIRECTORY, "EXAM-A-IPS.DB")315 openedexamfile = open(ipstore, 'w+') # erstelle die datei neu316 number = 0317 for i in ipfields:318 ip = i[0].text()319 port = i[1].text()320 if mutual_functions.checkIP(ip):321 thisexamfile = open(ipstore, 'a+') # anhängen322 number += 1323 if number != 1: # zeilenumbruch einfügen ausser vor erster zeile (keine leerzeilen in der datei erlaubt)324 thisexamfile.write("\n")325 thisexamfile.write("%s:%s" %(ip,port) )326 else:327 if ip != "":328 palettewarn = i[0].palette()329 palettewarn.setColor(i[0].backgroundRole(), QColor(200, 80, 80))330 # palettewarn.setColor(QPalette.Active, QPalette.Base, QColor(200, 80, 80))331 i[0].setPalette(palettewarn)332 system_commander.dialog_popup("Die Firewall wird aktiviert!")333 scriptfile = os.path.join(SCRIPTS_DIRECTORY, "exam-firewall.sh")334 startcommand = "exec %s start &" % (scriptfile)335 os.system(startcommand)336 self.ui.testfirewall.setText("Stoppe Firewall")337 def _onAutoabgabe(self):338 self._workingIndicator(True, 500)339 intervall = self.ui.aintervall.value()340 minute_intervall = intervall * 60 # minuten nicht sekunden341 if self.factory.lc.running:342 self.ui.autoabgabe.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/chronometer-off.png')))343 self.factory.lc.stop()344 self.log("<b>Auto-Submission deactivated </b>")345 return346 if intervall != 0:347 self.ui.autoabgabe.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/chronometer.png')))348 self.log("<b>Activated Auto-Submission every %s minutes </b>" % (str(intervall)))349 self.factory.lc.start(minute_intervall)350 else:351 self.log("Auto-Submission Intervall is set to 0 - Auto-Submission not active")352 def _onRemoveClient(self, client_id):353 self._workingIndicator(True, 500)354 client_name = self.factory.server_to_client.kick_client(client_id)355 #if client_name:356 sip.delete(self.get_list_widget_by_client_id(client_id)) #remove client widget no matter if client still is connected or not357 # delete all ocurrances of this screenshotitem (the whole item with the according widget and its labels)358 self.log('Connection to client <b> %s </b> has been <b>removed</b>.' % client_name)359 def _disableClientScreenshot(self, client):360 self._workingIndicator(True, 500)361 client_name = client.clientName362 client_id = client.clientConnectionID363 item = self.get_list_widget_by_client_id(client_id)364 pixmap = QPixmap(os.path.join(APP_DIRECTORY,'pixmaps/nouserscreenshot.png'))365 try:366 item.picture.setPixmap(pixmap)367 item.info.setText('%s \ndisconnected' % client_name)368 item.disabled = True369 except:370 #item not found because first connection attempt371 return372 def log(self, msg):373 timestamp = '[%s]' % datetime.datetime.now().strftime("%H:%M:%S")374 self.ui.logwidget.append(timestamp + " " + str(msg))375 def createOrUpdateListItem(self, client, screenshot_file_path):376 """generates new listitem that displays the clientscreenshot"""377 existing_item = self.get_list_widget_by_client_name(client.clientName)378 if existing_item: # just update screenshot379 self._updateListItemScreenshot(existing_item, client, screenshot_file_path)380 else:381 self._addNewListItem(client, screenshot_file_path)382 def _addNewListItem(self, client, screenshot_file_path):383 item = QtWidgets.QListWidgetItem()384 item.setSizeHint(QtCore.QSize(140, 100));385 item.id = client.clientName # store clientName as itemID for later use (delete event)386 item.pID = client.clientConnectionID387 item.disabled = False388 pixmap = QPixmap(screenshot_file_path)389 pixmap = pixmap.scaled(QtCore.QSize(120,67))390 item.picture = QtWidgets.QLabel()391 item.picture.setPixmap(pixmap)392 item.picture.setAlignment(QtCore.Qt.AlignCenter)393 item.info = QtWidgets.QLabel('%s \n%s' % (client.clientName, client.clientConnectionID))394 item.info = QtWidgets.QLabel('%s' % (client.clientName))395 item.info.setAlignment(QtCore.Qt.AlignCenter)396 grid = QtWidgets.QGridLayout()397 grid.setSpacing(1)398 grid.addWidget(item.picture, 1, 0)399 grid.addWidget(item.info, 2, 0)400 widget = QtWidgets.QWidget()401 widget.setLayout(grid)402 widget.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)403 widget.customContextMenuRequested.connect(lambda: self._on_context_menu(item.pID, item.disabled))404 widget.mouseDoubleClickEvent = lambda event: self._onDoubleClick(item.pID, item.id, screenshot_file_path, item.disabled)405 self.ui.listWidget.addItem(item) # add the listitem to the listwidget406 self.ui.listWidget.setItemWidget(item, widget) # set the widget as the listitem's widget407 def _updateListItemScreenshot(self, existing_item, client, screenshot_file_path):408 try:409 self.factory.disconnected_list.remove(client.clientName) # if client reconnected remove from disconnected_list410 except:411 pass #changed return to pass otherwise the screenshot is not updated412 pixmap = QPixmap(screenshot_file_path)413 pixmap = pixmap.scaled(QtCore.QSize(120, 67))414 existing_item.picture.setPixmap(pixmap)415 existing_item.info.setText('%s' % (client.clientName))416 existing_item.pID = client.clientConnectionID # in case this is a reconnect - update clientConnectionID in order to address the correct connection417 existing_item.disabled = False418 try:419 if self.screenshotwindow.client_connection_id == existing_item.pID:420 self.screenshotwindow.oImage = QImage(screenshot_file_path)421 self.screenshotwindow.sImage = self.screenshotwindow.oImage.scaled(QtCore.QSize(1200, 675)) # resize Image to widgets size422 self.screenshotwindow.palette = QPalette()423 self.screenshotwindow.palette.setBrush(10, QBrush(self.screenshotwindow.sImage)) # 10 = Windowrole424 self.screenshotwindow.setPalette(self.screenshotwindow.palette)425 426 except:427 pass428 def _onDoubleClick(self, client_connection_id, client_name, screenshot_file_path, client_disabled):429 if client_disabled:430 print("item disabled")431 return432 screenshotfilename = "%s.jpg" % client_connection_id433 self.screenshotwindow = ScreenshotWindow(self, screenshotfilename, client_name, screenshot_file_path, client_connection_id)434 self.screenshotwindow.exec_()435 def _on_context_menu(self, client_connection_id, is_disabled):436 menu = QtWidgets.QMenu()437 action_1 = QtWidgets.QAction("Abgabe holen", menu, triggered=lambda: self._onAbgabe(client_connection_id))438 action_2 = QtWidgets.QAction("Screenshot updaten", menu, triggered=lambda: self._onScreenshots(client_connection_id))439 action_3 = QtWidgets.QAction("Datei senden", menu, triggered=lambda: self._onSendFile(client_connection_id))440 action_4 = QtWidgets.QAction("Exam starten", menu, triggered=lambda: self._on_start_exam(client_connection_id))441 action_5 = QtWidgets.QAction("Exam beenden", menu, triggered=lambda: self._on_exit_exam(client_connection_id))442 action_6 = QtWidgets.QAction("Verbindung trennen", menu,443 triggered=lambda: self._onRemoveClient(client_connection_id))444 menu.addActions([action_1, action_2, action_3, action_4, action_5, action_6])445 if is_disabled:446 action_1.setEnabled(False)447 action_2.setEnabled(False)448 action_3.setEnabled(False)449 action_4.setEnabled(False)450 action_5.setEnabled(False)451 action_6.setText("Widget entfernen")452 cursor = QCursor()453 menu.exec_(cursor.pos())454 return455 def get_list_widget_items(self):456 """457 Creates an iterable list of all widget elements aka student screenshots458 :return: list of widget items459 """460 items = []461 for index in range(self.ui.listWidget.count()):462 items.append(self.ui.listWidget.item(index))463 return items464 def get_list_widget_by_client_id(self, client_id):465 for widget in self.get_list_widget_items():466 if client_id == widget.pID:467 print("Found existing list widget for client connectionId %s" % client_id )468 return widget469 return False470 def get_list_widget_by_client_name(self, client_name):471 for widget in self.get_list_widget_items():472 if client_name == widget.id:473 print("Found existing list widget for client name %s" % client_name )474 return widget475 return False476 def get_existing_or_skeleton_list_widget(self, client_name):477 pass478 479 480 def newOnkeyPressEvent(self,e):481 if e.key() == QtCore.Qt.Key_Escape:482 print("close event triggered")483 self._onAbbrechen()484 def closeEvent(self, evnt):485 evnt.ignore()486 print("close event triggered")487 if not self.msg:488 self._onAbbrechen()489 def _onAbbrechen(self): # Exit button490 self.msg = QtWidgets.QMessageBox()491 self.msg.setIcon(QtWidgets.QMessageBox.Information)492 self.msg.setText("Wollen sie das Programm\nLiFE Exam Server \nbeenden?")493 494 self.msg.setWindowTitle("LiFE Exam")495 self.msg.setStandardButtons(QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No)496 retval = self.msg.exec_() # 16384 = yes, 65536 = no497 498 if str(retval) == "16384":499 os.remove(SERVER_PIDFILE)500 self.ui.close()501 os._exit(0) # otherwise only the gui is closed and connections are kept alive502 else:503 self.msg = False504class ScreenshotWindow(QtWidgets.QDialog):505 def __init__(self, serverui, screenshot, clientname, screenshot_file_path, client_connection_id):506 QtWidgets.QDialog.__init__(self)507 self.setWindowIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/windowicon.png'))) # definiere icon für taskleiste508 self.screenshot = screenshot509 self.serverui = serverui510 self.screenshot_file_path = screenshot_file_path511 self.client_connection_id = client_connection_id512 text = "Screenshot - %s - %s" %(screenshot, clientname)513 self.setWindowTitle(text)514 self.setGeometry(100,100,1200,675)515 self.setFixedSize(1200, 675)516 oImage = QImage(screenshot_file_path)517 sImage = oImage.scaled(QtCore.QSize(1200,675)) # resize Image to widgets size518 palette = QPalette()519 palette.setBrush(10, QBrush(sImage)) # 10 = Windowrole520 self.setPalette(palette)521 button1 = QtWidgets.QPushButton('Screenshot archivieren', self)522 button1.move(1020, 580)523 button1.resize(180,40)524 button1.clicked.connect(self._archivescreenshot)525 button2 = QtWidgets.QPushButton('Abgabe holen', self)526 button2.move(1020, 480)527 button2.resize(180,40)528 button2.clicked.connect(lambda: serverui._onAbgabe(client_connection_id))529 button3 = QtWidgets.QPushButton('Screenshot updaten', self)530 button3.move(1020, 530)531 button3.resize(180,40)532 button3.clicked.connect(lambda: serverui._onScreenshots(client_connection_id))533 button4 = QtWidgets.QPushButton('Fenster schlieÃen', self)534 button4.move(1020, 430)535 button4.resize(180,40)536 button4.clicked.connect(self._onClose)537 def _onClose(self): # Exit button538 self.close()539 def _archivescreenshot(self):540 filedialog = QtWidgets.QFileDialog()541 filedialog.setDirectory(SHARE_DIRECTORY) # set default directory542 file_path = filedialog.getSaveFileName() # get filename543 file_path = file_path[0]544 if file_path:545 #os.rename(self.screenshot_file_path, file_path) #moves the file (its not available in src anymore)546 shutil.copyfile(self.screenshot_file_path,file_path)547 print("screensshot archived")548"""---------"""549""" """ 550""" TWISTED """551""" """ 552"""---------"""553class MyServerProtocol(basic.LineReceiver):554 """every new connection builds one MyServerProtocol object"""555 def __init__(self, factory):556 self.factory = factory # type: MyServerFactory557 self.clientName = ""558 self.file_handler = None559 self.line_data_list = ()560 self.refused = False561 self.clientConnectionID = ""562 self.filetransfer_fail_count = 0563 # twisted564 def connectionMade(self):565 self.factory.server_to_client.add_client(self)566 self.file_handler = None567 self.line_data_list = ()568 self.refused = False569 self.clientConnectionID = str(self.transport.client[1])570 self.transport.setTcpKeepAlive(1)571 self.factory.window.log(572 'Connection from: %s (%d clients total)' % (573 self.transport.getPeer().host, len(self.factory.server_to_client.clients)))574 # twisted575 def connectionLost(self, reason):576 print("ConnectionLost")577 print(reason) # maybe give it another try if connection closed unclean? ping it ? send custom keepalive? or even a reconnect call?578 self.factory.server_to_client.remove_client(self)579 self.file_handler = None580 self.line_data_list = ()581 self.factory.rawmode = False; # we deactivate the rawmode ft block here in case the disconnect interrupted an ongoing filetransfer which would never send \r\n and therefore never unblock (worst case scenario: an other ft could be started during an ongoing ft - because notblocked - and lead to a corrupted (and therefore removed) ft)582 583 self.factory.window.log(584 'Connection from %s lost (%d clients left)' % (585 self.transport.getPeer().host, len(self.factory.server_to_client.clients)))586 if not self.refused:587 self.factory.window._disableClientScreenshot(self)588 self.factory.disconnected_list.append(self.clientName) #keep client name in disconnected_list589 else:590 try:591 self.factory.disconnected_list.remove(self.clientName) # this one is not coming back592 except:593 return594 595 # twisted596 def rawDataReceived(self, data):597 """ handle incoming byte data """598 filename = self.line_data_list[2]599 file_path = os.path.join(self.factory.files_path, filename)600 # self.factory.window.log('Receiving file chunk (%d KB)' % (len(data)/1024))601 if not self.file_handler:602 self.file_handler = open(file_path, 'wb')603 if data.endswith(b'\r\n'): # Last chunk604 data = data[:-2]605 self.file_handler.write(data)606 self.file_handler.close()607 self.file_handler = None608 self.setLineMode()609 self.factory.rawmode = False; #filetransfer finished "UNLOCK" fileopertions610 611 if mutual_functions.validate_file_md5_hash(file_path, self.line_data_list[3]): # everything ok.. file received612 self.factory.window.log('File %s has been successfully transferred' % (filename))613 self.filetransfer_fail_count = 0614 615 if self.line_data_list[1] == DataType.SCREENSHOT.value: # screenshot is received on initial connection616 screenshot_file_path = os.path.join(SERVERSCREENSHOT_DIRECTORY, filename)617 os.rename(file_path, screenshot_file_path) # move image to screenshot folder618 mutual_functions.fixFilePermissions(SERVERSCREENSHOT_DIRECTORY) # fix filepermission of transferred file619 self.factory.window.createOrUpdateListItem(self, screenshot_file_path) # make the clientscreenshot visible in the listWidget620 elif self.line_data_list[1] == DataType.ABGABE.value:621 extract_dir = os.path.join(SHARE_DIRECTORY, self.clientName, filename[622 :-4]) # extract to unzipDIR / clientName / foldername without .zip (cut last four letters #shutil.unpack_archive(file_path, extract_dir, 'tar') #python3 only but twisted RPC is not ported to python3 yet623 user_dir = os.path.join(SHARE_DIRECTORY, self.clientName)624 mutual_functions.checkIfFileExists(user_dir) ## checks if filename is taken and renames this file in order to make room for the userfolder625 with zipfile.ZipFile(file_path, "r") as zip_ref:626 zip_ref.extractall(extract_dir) 627 os.unlink(file_path) # delete zip file628 mutual_functions.fixFilePermissions(SHARE_DIRECTORY) # fix filepermission of transferred file629 else: # wrong file hash630 os.unlink(file_path)631 self.transport.write(b'File was successfully transferred but not saved, due to invalid MD5 hash\n')632 self.transport.write(Command.ENDMSG.tobytes() + b'\r\n')633 self.factory.window.log('File %s has been successfully transferred, but deleted due to invalid MD5 hash' % (filename))634 # request flie again if filerequest was ABGABE (we don't care about a missed screenshotupdate)635 if self.line_data_list[1] == DataType.ABGABE.value and self.filetransfer_fail_count <= 1:636 self.filetransfer_fail_count += 1637 self.factory.window.log('Failed transfers: %s' %self.filetransfer_fail_count)638 self.factory.window._onAbgabe(self.clientConnectionID)639 else:640 self.filetransfer_fail_count = 0641 else:642 self.file_handler.write(data)643 def sendEncodedLine(self,line):644 # twisted645 self.sendLine(line.encode() )646 # twisted647 def lineReceived(self, line):648 """whenever the CLIENT sent something """649 line = line.decode() # we get bytes but need strings650 self.line_data_list = mutual_functions.clean_and_split_input(line)651 print("\nDEBUG: line received and decoded:\n%s\n" % self.line_data_list)652 self.line_dispatcher() #pass "self" as "client"653 654 655 def line_dispatcher(self):656 if len(self.line_data_list) == 0 or self.line_data_list == '':657 return658 """659 FILETRANSFER (ist immer getfile, der client kann derzeit noch keine files anfordern)660 command = self.line_data_list[0] 661 filetype = self.line_data_list[1] 662 filename = self.line_data_list[2] 663 filehash = self.line_data_list[3] 664 (( clientName = self.line_data_list[4] ))665 666 AUTH667 command = self.line_data_list[0] 668 id = self.line_data_list[1] 669 pincode = self.line_data_list[2] 670 671 """672 command = {673 Command.AUTH.value: self._checkclientAuth, 674 Command.FILETRANSFER.value: self._get_file_request,675 }676 line_handler = command.get(self.line_data_list[0], None)677 line_handler()678 def _get_file_request(self):679 """680 Puts server into raw mode to receive files681 """682 self.factory.window.log('Incoming File Transfer from Client <b>%s </b>' % (self.clientName))683 self.setRawMode() # this is a file - set to raw mode684 685 def _checkclientAuth(self):686 """687 searches for the newID in factory.clients and rejects the connection if found or wrong pincode688 :param newID: string689 :param pincode: int690 :return:691 """692 newID = self.line_data_list[1] 693 pincode = self.line_data_list[2]694 695 if newID in self.factory.server_to_client.clients.keys():696 print("this user already exists and is connected") #TEST keys contains numbers - newID is a name .. how does this work?697 self.refused = True698 self.sendEncodedLine(Command.REFUSED.value)699 self.transport.loseConnection()700 self.factory.window.log('Client Connection from %s has been refused. User already exists' % (newID))701 return702 elif int(pincode) != self.factory.pincode:703 print("wrong pincode")704 self.refused = True705 self.sendEncodedLine(Command.REFUSED.value)706 self.transport.loseConnection()707 self.factory.window.log('Client Connection from %s has been refused. Wrong pincode given' % (newID ))708 return709 else: # otherwise ad this unique id to the client protocol instance and request a screenshot710 print("pincode ok")711 self.clientName = newID712 self.factory.window.log('New Connection from <b>%s </b>' % (newID) )713 #transfer, send, screenshot, filename, hash, cleanabgabe714 line = "%s %s %s %s.jpg none none" % (Command.FILETRANSFER.value, Command.SEND.value, DataType.SCREENSHOT.value, self.transport.client[1])715 self.sendEncodedLine(line)716 return717class MyServerFactory(protocol.ServerFactory):718 def __init__(self, files_path, reactor):719 self.files_path = files_path720 self.reactor = reactor721 self.server_to_client = ServerToClient() # type: ServerToClient722 self.disconnected_list = []723 self.files = None724 self.clientslocked = False725 self.rawmode = False; #this is set to True the moment the server sends examconfig, sends file, sends printconf, requests abgabe, requests screenshot726 self.pincode = mutual_functions.generatePin(4)727 self.examid = "Exam-%s" % mutual_functions.generatePin(3)728 self.window = ServerUI(self) # type: ServerUI729 self.lc = LoopingCall(lambda: self.window._onAbgabe("all"))730 self.lcs = LoopingCall(lambda: self.window._onScreenshots("all"))731 732 intervall = self.window.ui.ssintervall.value()733 if intervall != 0:734 self.window.log("<b>Changed Screenshot Intervall to %s seconds </b>" % (str(intervall)))735 self.lcs.start(intervall)736 else:737 self.window.log("<b>Screenshot Intervall is set to 0 - Screenshotupdate deactivated</b>")738 739 740 741 # _onAbgabe kann durch lc.start(intevall) im intervall ausgeführt werden742 #mutual_functions.checkFirewall(self.window.get_firewall_adress_list()) # deactivates all iptable rules if any743 #starting multicast server here in order to provide "factory" information via broadcast744 self.reactor.listenMulticast(8005, MultcastLifeServer(self), listenMultiple=True)745 """746 http://twistedmatrix.com/documents/12.1.0/api/twisted.internet.protocol.Factory.html#buildProtocol747 """748 def buildProtocol(self, addr):749 return MyServerProtocol(self)750 """751 wird bei einer eingehenden client connection aufgerufen - erstellt ein object der klasse MyServerProtocol für jede connection und übergibt self (die factory)752 """753class MultcastLifeServer(DatagramProtocol):754 def __init__(self, factory):755 self.factory = factory756 def startProtocol(self):757 """Called after protocol has started listening. """758 self.transport.setTTL(5) # Set the TTL>1 so multicast will cross router hops:759 self.transport.joinGroup("228.0.0.5") # Join a specific multicast group:760 def datagramReceived(self, datagram, address):761 762 datagram = datagram.decode()763 764 if "CLIENT" in datagram:765 # Rather than replying to the group multicast address, we send the766 # reply directly (unicast) to the originating port:767 print("Datagram %s received from %s" % (repr(datagram), repr(address)) )768 serverinfo = self.factory.examid + " " + " ".join(self.factory.disconnected_list)769 message = "SERVER %s" % serverinfo770 self.transport.write(message.encode(), ("228.0.0.5", 8005))771 #self.transport.write("SERVER: Assimilate", address) #this is NOT WORKINC772if __name__ == '__main__':773 mutual_functions.prepareDirectories() # cleans everything and copies some scripts774 killscript = os.path.join(SCRIPTS_DIRECTORY, "terminate-exam-process.sh")775 os.system("%s %s" % (killscript, 'server')) # make sure only one client instance is running per client776 # time.sleep(1)777 mutual_functions.writePidFile()778 app = QtWidgets.QApplication(sys.argv)779 qt5reactor.install() # imported from file and needed for Qt to function properly in combination with twisted reactor780 from twisted.internet import reactor781 reactor.listenTCP(SERVER_PORT, MyServerFactory(SERVERFILES_DIRECTORY, reactor )) # start the server on SERVER_PORT782 # moved multicastserver starting sequence to factory783 #reactor.listenMulticast(8005, MultcastLifeServer(), listenMultiple=True)784 print ('Listening on port %d' % (SERVER_PORT))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!