Best Python code snippet using testcontainers-python_python
ProcessLibrary.py
Source:ProcessLibrary.py
...19 return []20 def output_ready(self):21 return self.stages["DifferentialStage"].is_output_ready()22 def initialize_containers(self):23 sc = self.get_container('RAWIMAGE')24 sc.matrix = None25 sc.data_direction = EDataDirection.Output26 self.locals[sc.name] = sc27 def create_stages(self):28 x = [DifferentialStage(self, 'DifferentialStage')]29 self.add_stages(x)30 def on_completion(self):31 ds = self.stages['DifferentialStage']32 output = ds.output_containers['DIFFERENTIALIMAGE'].matrix # type: MatrixContainer33 self.targetImager.channel_images['diff'] = output34 ds.output_containers['MEANSQUAREDERROR'].data_direction = EDataDirection.Output35 ds.output_containers['DIFFERENTIALIMAGE'].data_direction = EDataDirection.Output36 ds.input_containers['RAWIMAGE'].data_direction = EDataDirection.Output37 self.completed = False38class IntrinsicsCalculatorProcess(IDEFProcess):39 def __init__(self, name, targetImager):40 IDEFProcess.__init__(self, name)41 self.targetImager = targetImager42 def get_required_containers(self):43 container_list = [IDEFStageBinding(EConnection.Input, EContainerType.MATRIXCONTAINER,44 EDataDirection.Input, "IMAGER")]45 return container_list46 def output_ready(self):47 return self.stages["DistortionCalculatorStage"].is_output_ready()48 def initialize_containers(self):49 sc = self.get_container('BOARDSIZE')50 sc.value = (9, 7)51 sc.data_direction = EDataDirection.Input52 self.locals[sc.name] = sc53 sc = self.get_container('IMAGESIZE')54 sc.value = self.targetImager.get_resolution()55 sc.data_direction = EDataDirection.Input56 self.locals[sc.name] = sc57 sc = self.get_container('MATCHCOUNT')58 sc.value = 459 sc.data_direction = EDataDirection.Input60 self.locals[sc.name] = sc61 sc = self.get_container('MATCHSEPARATION')62 sc.value = 263 sc.data_direction = EDataDirection.Input64 self.locals[sc.name] = sc65 sc = self.get_container('MATCHMOVE')66 sc.value = 12567 sc.data_direction = EDataDirection.Input68 self.locals[sc.name] = sc69 sc = self.get_container('RAWIMAGE')70 sc.matrix = None71 sc.data_direction = EDataDirection.Output72 self.locals[sc.name] = sc73 sc = self.get_container('CAMERAINTRINSICS')74 sc.data_direction = EDataDirection.Output75 self.locals[sc.name] = sc76 sc = self.get_container('REPROJECTIONERROR')77 sc.data_direction = EDataDirection.Output78 self.locals[sc.name] = sc79 # clear out the composite chessboard stream80 self.targetImager.set_image('compositechess', None, None)81 def create_stages(self):82 x = [CalibrationStage(self, 'CalibrationStage'), DistortionCalculatorStage(self, 'DistortionCalculatorStage')]83 self.add_stages(x)84 def on_completion(self):85 if Notifier.activeNotifier is not None:86 Notifier.activeNotifier.speak_message("completed")87 ds = self.stages['DistortionCalculatorStage']88 intrinsics = ds.output_containers['CAMERAINTRINSICS'].value # type: ScalarContainer89 reperr = ds.output_containers['REPROJECTIONERROR'] # type: ScalarContainer90 self.status_message("Reprojection error is {}".format(reperr.value))91 translated = {}92 for n, v in intrinsics.items():93 if isinstance(v, numpy.ndarray):94 translated[n] = IDEFProcess.serialize_matrix_to_json(v)95 else:96 translated[n] = v;97 translated['TIMESTAMP'] = datetime.datetime.now().isoformat()98 translated['ID'] = uuid.uuid4().hex99 imager = self.locals['IMAGER'].value100 translated['CONTROLLER'] = imager.controller.resource101 translated['CAMERAINDEX'] = imager.imager_address102 translated['MATCHCOUNT'] = self.locals['MATCHCOUNT'].value103 translated['MATCHSEPARATION'] = self.locals['MATCHSEPARATION'].value104 cfg = json.dumps(translated, ensure_ascii=False)105 self.targetImager.controller.publish_message(self.targetImager.imager_address, "intrinsics", cfg)106 filename = self.targetImager.calibration_filename()107 file = open(filename, "w")108 file.write(cfg)109 file.close()110 filename = "../CalibrationRecords/IntrinsicsHistory.txt"111 if os.path.isfile(filename):112 file = open(filename, "a")113 file.write(",\n")114 else:115 file = open(filename, "w")116 file.write(cfg)117 file.close()118class StereoCalculatorProcess(IDEFProcess):119 def __init__(self, name, targetImagerLeft, targetImagerRight):120 IDEFProcess.__init__(self, name)121 self.targetImagerLeft = targetImagerLeft122 self.targetImagerRight = targetImagerRight123 def get_required_containers(self):124 container_list = [IDEFStageBinding(EConnection.Control, EContainerType.SCALARCONTAINER,125 EDataDirection.Input, "RECORDING")]126 return container_list127 def output_ready(self):128 return self.stages["StereoCalibrationCalculatorStage"].is_output_ready()129 def buildmat(self, dv):130 X = numpy.empty(shape=[dv[0], dv[1]])131 dvi = 2132 for i in range(dv[0]):133 for j in range(dv[1]):134 X[i, j] = dv[dvi]135 dvi += 1136 return X137 def initialize_containers(self):138 sc = self.get_container('RAWIMAGELEFT')139 sc.matrix = None140 sc.data_direction = EDataDirection.Output141 self.locals[sc.name] = sc142 sc = self.get_container('RAWIMAGERIGHT')143 sc.matrix = None144 sc.data_direction = EDataDirection.Output145 self.locals[sc.name] = sc146 left_calib = json.loads(self.targetImagerLeft.get_calibration())147 right_calib = json.loads(self.targetImagerRight.get_calibration())148 sc = self.get_container('CAMERAMATRIXLEFT')149 sc.matrix = self.buildmat(left_calib["CAMERAMATRIX"])150 sc.data_direction = EDataDirection.Output151 self.locals[sc.name] = sc152 sc = self.get_container('CAMERAMATRIXRIGHT')153 sc.matrix = self.buildmat(right_calib["CAMERAMATRIX"])154 sc.data_direction = EDataDirection.Output155 self.locals[sc.name] = sc156 sc = self.get_container('DISTORTIONCOEFFSLEFT')157 sc.matrix = self.buildmat(left_calib['DISTORTIONCOEFFICIENTS'])158 sc.data_direction = EDataDirection.Output159 self.locals[sc.name] = sc160 sc = self.get_container('DISTORTIONCOEFFSRIGHT')161 sc.matrix = self.buildmat(right_calib['DISTORTIONCOEFFICIENTS'])162 sc.data_direction = EDataDirection.Output163 self.locals[sc.name] = sc164 sc = self.get_container('BOARDSIZE')165 sc.value = (9, 7)166 sc.data_direction = EDataDirection.Input167 self.locals[sc.name] = sc168 sc = self.get_container('IMAGESIZE')169 # todo This is assuming both imagers have equal resolution170 sc.value = self.targetImagerLeft.get_resolution()171 sc.data_direction = EDataDirection.Input172 self.locals[sc.name] = sc173 sc = self.get_container('MATCHCOUNT')174 sc.value = 20175 sc.data_direction = EDataDirection.Input176 self.locals[sc.name] = sc177 sc = self.get_container('MATCHSEPARATION')178 sc.value = 0179 sc.data_direction = EDataDirection.Input180 self.locals[sc.name] = sc181 sc = self.get_container('STEREOROTATION')182 sc.data_direction = EDataDirection.Output183 self.locals[sc.name] = sc184 sc = self.get_container('STEREOTRANSLATION')185 sc.data_direction = EDataDirection.Output186 self.locals[sc.name] = sc187 sc = self.get_container('ESSENTIAL')188 sc.data_direction = EDataDirection.Output189 self.locals[sc.name] = sc190 sc = self.get_container('FUNDAMENTAL')191 sc.data_direction = EDataDirection.Output192 self.locals[sc.name] = sc193 def create_stages(self):194 x = [StereoCalibrationStage(self, 'StereoCalibrationStage'),195 StereoCalibratorCalculatorStage(self, 'StereoCalibratorCalculatorStage')]196 self.add_stages(x)197 def on_completion(self):198 if Notifier.activeNotifier is not None:199 Notifier.activeNotifier.speak_message("completed")200 ds = self.stages['StereoCalibratorCalculatorStage']201 dd = {}202 dd['Q'] = str(ds.output_containers['REPROJECTIONERROR'].value)203 dd['R'] = StereoTrainingSet.flatten_matrix(204 ds.output_containers['STEREOROTATION'].matrix) # type: MatrixContainer205 dd['T'] = StereoTrainingSet.flatten_matrix(206 ds.output_containers['STEREOTRANSLATION'].matrix) # type: MatrixContainer207 dd['E'] = StereoTrainingSet.flatten_matrix(ds.output_containers['ESSENTIAL'].matrix) # type: MatrixContainer208 dd['F'] = StereoTrainingSet.flatten_matrix(ds.output_containers['FUNDAMENTAL'].matrix) # type: MatrixContainer209 dd['CML'] = StereoTrainingSet.flatten_matrix(ds.input_containers['CAMERAMATRIXLEFT'].matrix)210 dd['CMR'] = StereoTrainingSet.flatten_matrix(ds.input_containers['CAMERAMATRIXRIGHT'].matrix)211 dd['DCL'] = StereoTrainingSet.flatten_matrix(ds.input_containers['DISTORTIONCOEFFSLEFT'].matrix)212 dd['DCR'] = StereoTrainingSet.flatten_matrix(ds.input_containers['DISTORTIONCOEFFSRIGHT'].matrix)213 self.status_message("RMS Stereo error is {}".format(dd['Q']))214 serialized = json.dumps(dd, ensure_ascii=False)215 self.targetImagerLeft.controller.publish_message(self.targetImagerLeft.imager_address, "stereoextrinsics",216 serialized)217 filename = self.targetImagerLeft.stereo_filename()218 file = open(filename, "w")219 file.write(serialized)220 file.close()221 # save the recorded session data222 if self.get_container('RECORDING').value:223 dd['CALIBRATIONIMAGEHISTORY'] = ds.environment_containers['CALIBRATIONIMAGEHISTORY'].value224 dd['CONTROLLER'] = 'unknownctlr'225 dd['timestamp'] = datetime.datetime.now().isoformat()226 path = StereoCalculatorProcess.stereo_sessionfolder()227 num_files = sum(os.path.isfile(os.path.join(path, f)) for f in os.listdir(path))228 filename = "../CalibrationRecords/StereoSessions/session.{}.json".format(num_files + 1)229 with open(filename, 'w') as f:230 json.dump(dd, f, ensure_ascii=True)231class ChessboardCaptureProcess(IDEFProcess):232 def __init__(self, name, targetImagerLeft, targetImagerRight):233 IDEFProcess.__init__(self, name)234 self.targetImagerLeft = targetImagerLeft235 self.targetImagerRight = targetImagerRight236 def get_required_containers(self):237 container_list = []238 return container_list239 def output_ready(self):240 return self.stages["ChessboardCaptureStage"].is_output_ready()241 def buildmat(self, dv):242 X = numpy.empty(shape=[dv[0], dv[1]])243 dvi = 2244 for i in range(dv[0]):245 for j in range(dv[1]):246 X[i, j] = dv[dvi]247 dvi += 1248 return X249 def initialize_containers(self):250 sc = self.get_container('RAWIMAGELEFT')251 sc.matrix = None252 sc.data_direction = EDataDirection.Output253 self.locals[sc.name] = sc254 sc = self.get_container('RAWIMAGERIGHT')255 sc.matrix = None256 sc.data_direction = EDataDirection.Output257 self.locals[sc.name] = sc258 sc = self.get_container('LEFTIMAGER')259 sc.value = self.targetImagerLeft260 sc.data_direction = EDataDirection.Input261 self.locals[sc.name] = sc262 sc = self.get_container('RIGHTIMAGER')263 sc.value = self.targetImagerRight264 sc.data_direction = EDataDirection.Input265 self.locals[sc.name] = sc266 sc = self.get_container('BOARDSIZE')267 sc.value = (9, 7)268 sc.data_direction = EDataDirection.Input269 self.locals[sc.name] = sc270 def create_stages(self):271 x = [ImageInjector(self, 'ImageInjector', 'raw'),272 ChessboardCaptureStage(self, 'ChessboardCaptureStage')]273 self.add_stages(x)274 def on_completion(self):275 if Notifier.activeNotifier is not None:276 Notifier.activeNotifier.speak_message("completed")277 ds = self.stages['ChessboardCaptureStage']278class CalibratedStereoCalculatorProcess(IDEFProcess):279 def __init__(self, name, targetImagerLeft, targetImagerRight):280 IDEFProcess.__init__(self, name)281 self.targetImagerLeft = targetImagerLeft282 self.targetImagerRight = targetImagerRight283 def get_required_containers(self):284 container_list = [IDEFStageBinding(EConnection.Control, EContainerType.SCALARCONTAINER,285 EDataDirection.Input, "RECORDING")]286 return container_list287 def output_ready(self):288 return self.stages["StereoCalibrationCalculatorStage"].is_output_ready()289 def buildmat(self, dv):290 X = numpy.empty(shape=[dv[0], dv[1]])291 dvi = 2292 for i in range(dv[0]):293 for j in range(dv[1]):294 X[i, j] = dv[dvi]295 dvi += 1296 return X297 def initialize_containers(self):298 sc = self.get_container('RAWIMAGELEFT')299 sc.matrix = None300 sc.data_direction = EDataDirection.Output301 self.locals[sc.name] = sc302 sc = self.get_container('RAWIMAGERIGHT')303 sc.matrix = None304 sc.data_direction = EDataDirection.Output305 self.locals[sc.name] = sc306 left_calib = json.loads(self.targetImagerLeft.get_calibration())307 right_calib = json.loads(self.targetImagerRight.get_calibration())308 sc = self.get_container('CAMERAMATRIXLEFT')309 sc.matrix = self.buildmat(left_calib["CAMERAMATRIX"])310 sc.data_direction = EDataDirection.Output311 self.locals[sc.name] = sc312 sc = self.get_container('CAMERAMATRIXRIGHT')313 sc.matrix = self.buildmat(right_calib["CAMERAMATRIX"])314 sc.data_direction = EDataDirection.Output315 self.locals[sc.name] = sc316 sc = self.get_container('DISTORTIONCOEFFSLEFT')317 sc.matrix = self.buildmat(left_calib['DISTORTIONCOEFFICIENTS'])318 sc.data_direction = EDataDirection.Output319 self.locals[sc.name] = sc320 sc = self.get_container('DISTORTIONCOEFFSRIGHT')321 sc.matrix = self.buildmat(right_calib['DISTORTIONCOEFFICIENTS'])322 sc.data_direction = EDataDirection.Output323 self.locals[sc.name] = sc324 sc = self.get_container('BOARDSIZE')325 sc.value = (9, 7)326 sc.data_direction = EDataDirection.Input327 self.locals[sc.name] = sc328 sc = self.get_container('IMAGESIZE')329 # todo This is assuming both imagers have equal resolution330 sc.value = self.targetImagerLeft.get_resolution()331 sc.data_direction = EDataDirection.Input332 self.locals[sc.name] = sc333 sc = self.get_container('NUMPAIRS')334 sc.value = 10335 sc.data_direction = EDataDirection.Input336 self.locals[sc.name] = sc337 sc = self.get_container('MATCHCOUNT')338 sc.value = 999339 sc.data_direction = EDataDirection.Input340 self.locals[sc.name] = sc341 sc = self.get_container('MATCHSEPARATION')342 sc.value = 0343 sc.data_direction = EDataDirection.Input344 self.locals[sc.name] = sc345 sc = self.get_container('STEREOROTATION')346 sc.data_direction = EDataDirection.Output347 self.locals[sc.name] = sc348 sc = self.get_container('STEREOTRANSLATION')349 sc.data_direction = EDataDirection.Output350 self.locals[sc.name] = sc351 sc = self.get_container('ESSENTIAL')352 sc.data_direction = EDataDirection.Output353 self.locals[sc.name] = sc354 sc = self.get_container('FUNDAMENTAL')355 sc.data_direction = EDataDirection.Output356 self.locals[sc.name] = sc357 def create_stages(self):358 x = [RandomSolutionSearchStage(self, 'TestImageIngestorStage'),359 StereoCalibrationStage(self, 'StereoCalibrationStage'),360 StereoCalibratorCalculatorStage(self, 'StereoCalibratorCalculatorStage')]361 self.add_stages(x)362 def on_completion(self):363 if Notifier.activeNotifier is not None:364 Notifier.activeNotifier.speak_message("completed")365 ds = self.stages['StereoCalibratorCalculatorStage']366 dd = {}367 dd['Q'] = str(ds.output_containers['REPROJECTIONERROR'].value)368 dd['R'] = StereoTrainingSet.flatten_matrix(369 ds.output_containers['STEREOROTATION'].matrix) # type: MatrixContainer370 dd['T'] = StereoTrainingSet.flatten_matrix(371 ds.output_containers['STEREOTRANSLATION'].matrix) # type: MatrixContainer372 dd['E'] = StereoTrainingSet.flatten_matrix(ds.output_containers['ESSENTIAL'].matrix) # type: MatrixContainer373 dd['F'] = StereoTrainingSet.flatten_matrix(ds.output_containers['FUNDAMENTAL'].matrix) # type: MatrixContainer374 dd['CML'] = StereoTrainingSet.flatten_matrix(ds.input_containers['CAMERAMATRIXLEFT'].matrix)375 dd['CMR'] = StereoTrainingSet.flatten_matrix(ds.input_containers['CAMERAMATRIXRIGHT'].matrix)376 dd['DCL'] = StereoTrainingSet.flatten_matrix(ds.input_containers['DISTORTIONCOEFFSLEFT'].matrix)377 dd['DCR'] = StereoTrainingSet.flatten_matrix(ds.input_containers['DISTORTIONCOEFFSRIGHT'].matrix)378 self.status_message("RMS Stereo error is {}".format(dd['Q']))379 serialized = json.dumps(dd, ensure_ascii=False)380 self.targetImagerLeft.controller.publish_message(self.targetImagerLeft.imager_address, "stereoextrinsics",381 serialized)382 filename = self.targetImagerLeft.stereo_filename()383 file = open(filename, "w")384 file.write(serialized)385 file.close()386 # save the recorded session data387 if self.get_container('RECORDING').value:388 dd['CALIBRATIONIMAGEHISTORY'] = ds.environment_containers['CALIBRATIONIMAGEHISTORY'].value389 dd['CONTROLLER'] = 'unknownctlr'390 dd['timestamp'] = datetime.datetime.now().isoformat()391 path = StereoCalculatorProcess.stereo_sessionfolder()392 num_files = sum(os.path.isfile(os.path.join(path, f)) for f in os.listdir(path))393 filename = "../CalibrationRecords/StereoSessions/session.{}.json".format(num_files + 1)394 with open(filename, 'w') as f:395 json.dump(dd, f, ensure_ascii=True)396class CalibratedIntrinsicsCalculatorProcess(IDEFProcess):397 def __init__(self, name, isLeftCamera, targetImagerLeft, targetImagerRight):398 IDEFProcess.__init__(self, name)399 self.cameraCode = 'L' if isLeftCamera else 'R'400 self.targetImagerLeft = targetImagerLeft401 self.targetImagerRight = targetImagerRight402 self.live_image_transfer = False403 def get_required_containers(self):404 container_list = [IDEFStageBinding(EConnection.Control, EContainerType.SCALARCONTAINER,405 EDataDirection.Input, "RECORDING")]406 return container_list407 def output_ready(self):408 return self.stages["StereoCalibrationCalculatorStage"].is_output_ready()409 def buildmat(self, dv):410 X = numpy.empty(shape=[dv[0], dv[1]])411 dvi = 2412 for i in range(dv[0]):413 for j in range(dv[1]):414 X[i, j] = dv[dvi]415 dvi += 1416 return X417 def initialize_containers(self):418 sc = self.get_container('INGESTMODE')419 sc.value = self.cameraCode420 sc.data_direction = EDataDirection.Input421 self.locals[sc.name] = sc422 sc = self.get_container('RAWIMAGE')423 sc.matrix = None424 sc.data_direction = EDataDirection.Output425 self.locals[sc.name] = sc426 sc = self.get_container('BOARDSIZE')427 sc.value = (9, 7)428 sc.data_direction = EDataDirection.Input429 self.locals[sc.name] = sc430 sc = self.get_container('IMAGESIZE')431 # todo This is assuming both imagers have equal resolution432 sc.value = self.targetImagerLeft.get_resolution()433 sc.data_direction = EDataDirection.Input434 self.locals[sc.name] = sc435 numsamples = 24436 sc = self.get_container('NUMPAIRS')437 sc.value = numsamples438 sc.data_direction = EDataDirection.Input439 self.locals[sc.name] = sc440 sc = self.get_container('MATCHCOUNT')441 sc.value = numsamples442 sc.data_direction = EDataDirection.Input443 self.locals[sc.name] = sc444 sc = self.get_container('MATCHSEPARATION')445 sc.value = 0446 sc.data_direction = EDataDirection.Input447 self.locals[sc.name] = sc448 sc = self.get_container('IMAGER')449 sc.value = self.targetImagerLeft450 sc.data_direction = EDataDirection.Input451 self.locals[sc.name] = sc452 sc = self.get_container('MATCHMOVE')453 sc.value = 0454 sc.data_direction = EDataDirection.Input455 self.locals[sc.name] = sc456 sc = self.get_container('REPROJECTIONERROR')457 sc.data_direction = EDataDirection.Output458 self.locals[sc.name] = sc459 self.targetImagerLeft.set_image('compositechess', None, None)460 def create_stages(self):461 x = [RandomSolutionSearchStage(self, 'RandomSolutionSearchStage'),462 StoredImageInterfaceStage(self, 'StoredImageInterfaceStage'),463 CalibrationStage(self, 'CalibrationStage'),464 DistortionCalculatorStage(self, 'DistortionCalculatorStage')]465 self.add_stages(x)466 def on_completion(self):467 if Notifier.activeNotifier is not None:468 Notifier.activeNotifier.speak_message("completed")469 ds = self.stages['DistortionCalculatorStage']470 intrinsics = ds.output_containers['CAMERAINTRINSICS'].value # type: ScalarContainer471 reperr = ds.output_containers['REPROJECTIONERROR'] # type: ScalarContainer472 self.status_message("Reprojection error is {}".format(reperr.value))473 translated = {}474 for n, v in intrinsics.items():475 if isinstance(v, numpy.ndarray):476 translated[n] = IDEFProcess.serialize_matrix_to_json(v)477 else:478 translated[n] = v;479 translated['TIMESTAMP'] = datetime.datetime.now().isoformat()480 translated['ID'] = uuid.uuid4().hex481 imager = self.locals['IMAGER'].value482 translated['CONTROLLER'] = imager.controller.resource483 translated['CAMERAINDEX'] = imager.imager_address484 translated['MATCHCOUNT'] = self.locals['MATCHCOUNT'].value485 translated['MATCHSEPARATION'] = self.locals['MATCHSEPARATION'].value486 cfg = json.dumps(translated, ensure_ascii=False)487 sc = self.get_container('IMAGER').value488 sc.controller.publish_message(sc.imager_address, "intrinsics", cfg)489 filename = sc.calibration_filename()490 file = open(filename, "w")491 file.write(cfg)492 file.close()493 filename = "../CalibrationRecords/IntrinsicsHistory.txt"494 if os.path.isfile(filename):495 file = open(filename, "a")496 file.write(",\n")497 else:498 file = open(filename, "w")499 file.write(cfg)500 file.close()501class GeneticIntrinsicsCalculatorProcess(IDEFProcess):502 def __init__(self, name, isLeftCamera, targetImagerLeft, targetImagerRight):503 IDEFProcess.__init__(self, name)504 self.cameraCode = 'L' if isLeftCamera else 'R'505 self.targetImagerLeft = targetImagerLeft506 self.targetImagerRight = targetImagerRight507 self.live_image_transfer = False508 def get_required_containers(self):509 container_list = [IDEFStageBinding(EConnection.Control, EContainerType.SCALARCONTAINER,510 EDataDirection.Input, "RECORDING")]511 return container_list512 def output_ready(self):513 return self.stages["StereoCalibrationCalculatorStage"].is_output_ready()514 def buildmat(self, dv):515 X = numpy.empty(shape=[dv[0], dv[1]])516 dvi = 2517 for i in range(dv[0]):518 for j in range(dv[1]):519 X[i, j] = dv[dvi]520 dvi += 1521 return X522 def initialize_containers(self):523 sc = self.get_container('INGESTMODE')524 sc.value = self.cameraCode525 sc.data_direction = EDataDirection.Input526 self.locals[sc.name] = sc527 sc = self.get_container('POPULATIONLIMIT')528 sc.value = 100529 sc.data_direction = EDataDirection.Input530 self.locals[sc.name] = sc531 if StereoTrainingSet.data_ready(True):532 sts = StereoTrainingSet(True)533 sts.load_training_results()534 td = sts.extract_population(self.get_container('POPULATIONLIMIT').value)535 sc = self.get_container('POPULATIONSEED')536 sc.value = td537 sc.data_direction = EDataDirection.Input538 self.locals[sc.name] = sc539 sc = self.get_container('RAWIMAGE')540 sc.matrix = None541 sc.data_direction = EDataDirection.Output542 self.locals[sc.name] = sc543 sc = self.get_container('BOARDSIZE')544 sc.value = (9, 7)545 sc.data_direction = EDataDirection.Input546 self.locals[sc.name] = sc547 sc = self.get_container('IMAGESIZE')548 # todo This is assuming both imagers have equal resolution549 sc.value = self.targetImagerLeft.get_resolution()550 sc.data_direction = EDataDirection.Input551 self.locals[sc.name] = sc552 numsamples = 24553 sc = self.get_container('NUMPAIRS')554 sc.value = numsamples555 sc.data_direction = EDataDirection.Input556 self.locals[sc.name] = sc557 sc = self.get_container('MATCHCOUNT')558 sc.value = numsamples559 sc.data_direction = EDataDirection.Input560 self.locals[sc.name] = sc561 sc = self.get_container('MATCHSEPARATION')562 sc.value = 0563 sc.data_direction = EDataDirection.Input564 self.locals[sc.name] = sc565 sc = self.get_container('IMAGER')566 sc.value = self.targetImagerLeft567 sc.data_direction = EDataDirection.Input568 self.locals[sc.name] = sc569 sc = self.get_container('MATCHMOVE')570 sc.value = 0571 sc.data_direction = EDataDirection.Input572 self.locals[sc.name] = sc573 sc = self.get_container('REPROJECTIONERROR')574 sc.data_direction = EDataDirection.Output575 self.locals[sc.name] = sc576 # load initialization data if it's avaiable577 self.targetImagerLeft.set_image('compositechess', None, None)578 def create_stages(self):579 x = [GeneticSolutionSearchStage(self, 'GeneticSolutionSearchStage'),580 StoredImageInterfaceStage(self, 'StoredImageInterfaceStage'),581 CalibrationStage(self, 'CalibrationStage'),582 DistortionCalculatorStage(self, 'DistortionCalculatorStage')]583 self.add_stages(x)584 def on_completion(self):585 if Notifier.activeNotifier is not None:586 Notifier.activeNotifier.speak_message("completed")587 ds = self.stages['DistortionCalculatorStage']588 intrinsics = ds.output_containers['CAMERAINTRINSICS'].value # type: ScalarContainer589 reperr = ds.output_containers['REPROJECTIONERROR'] # type: ScalarContainer590 self.status_message("Reprojection error is {}".format(reperr.value))591 translated = {}592 for n, v in intrinsics.items():593 if isinstance(v, numpy.ndarray):594 translated[n] = IDEFProcess.serialize_matrix_to_json(v)595 else:596 translated[n] = v;597 translated['TIMESTAMP'] = datetime.datetime.now().isoformat()598 translated['ID'] = uuid.uuid4().hex599 imager = self.locals['IMAGER'].value600 translated['CONTROLLER'] = imager.controller.resource601 translated['CAMERAINDEX'] = imager.imager_address602 translated['MATCHCOUNT'] = self.locals['MATCHCOUNT'].value603 translated['MATCHSEPARATION'] = self.locals['MATCHSEPARATION'].value604 cfg = json.dumps(translated, ensure_ascii=False)605 sc = self.get_container('IMAGER').value606 sc.controller.publish_message(sc.imager_address, "intrinsics", cfg)607 filename = sc.calibration_filename()608 file = open(filename, "w")609 file.write(cfg)610 file.close()611 filename = "../CalibrationRecords/IntrinsicsHistory.txt"612 if os.path.isfile(filename):613 file = open(filename, "a")614 file.write(",\n")615 else:616 file = open(filename, "w")617 file.write(cfg)618 file.close()619class StereoIntrinsicsCalculatorProcess(IDEFProcess):620 def __init__(self, name, targetImagerLeft, targetImagerRight):621 IDEFProcess.__init__(self, name)622 self.targetImagerLeft = targetImagerLeft623 self.targetImagerRight = targetImagerRight624 def get_required_containers(self):625 container_list = [IDEFStageBinding(EConnection.Control, EContainerType.SCALARCONTAINER,626 EDataDirection.Input, "RECORDING")]627 return container_list628 def output_ready(self):629 return self.stages["StereoCalibrationCalculatorStage"].is_output_ready()630 def buildmat(self, dv):631 X = numpy.empty(shape=[dv[0], dv[1]])632 dvi = 2633 for i in range(dv[0]):634 for j in range(dv[1]):635 X[i, j] = dv[dvi]636 dvi += 1637 return X638 def initialize_containers(self):639 if StereoTrainingSet.data_ready(False):640 sts = StereoTrainingSet(True)641 sts.load_training_results()642 td = sts.extract_population(250)643 sc = self.get_container('STEREOTRAININGSET')644 sc.value = td645 sc.data_direction = EDataDirection.Input646 self.locals[sc.name] = sc647 else:648 raise ValueError('no stereo data available')649 sc = self.get_container('BOARDSIZE')650 sc.value = (9, 7)651 sc.data_direction = EDataDirection.Input652 self.locals[sc.name] = sc653 sc = self.get_container('IMAGESIZE')654 # todo This is assuming both imagers have equal resolution655 sc.value = self.targetImagerLeft.get_resolution()656 sc.data_direction = EDataDirection.Input657 self.locals[sc.name] = sc658 numsamples = 24659 sc = self.get_container('NUMPAIRS')660 sc.value = numsamples661 sc.data_direction = EDataDirection.Input662 self.locals[sc.name] = sc663 sc = self.get_container('MATCHCOUNT')664 sc.value = numsamples665 sc.data_direction = EDataDirection.Input666 self.locals[sc.name] = sc667 sc = self.get_container('MATCHSEPARATION')668 sc.value = 0669 sc.data_direction = EDataDirection.Input670 self.locals[sc.name] = sc671 sc = self.get_container('LEFTIMAGER')672 sc.value = self.targetImagerLeft673 sc.data_direction = EDataDirection.Input674 self.locals[sc.name] = sc675 sc = self.get_container('RIGHTIMAGER')676 sc.value = self.targetImagerRight677 sc.data_direction = EDataDirection.Input678 self.locals[sc.name] = sc679 sc = self.get_container('MATCHMOVE')680 sc.value = 0681 sc.data_direction = EDataDirection.Input682 self.locals[sc.name] = sc683 sc = self.get_container('REPROJECTIONERROR')684 sc.data_direction = EDataDirection.Output685 self.locals[sc.name] = sc686 sc = self.get_container('RAWIMAGELEFT')687 sc.data_direction = EDataDirection.Output688 self.locals[sc.name] = sc689 sc = self.get_container('RAWIMAGERIGHT')690 sc.data_direction = EDataDirection.Output691 self.locals[sc.name] = sc692 # load initialization data if it's avaiable693 self.targetImagerLeft.set_image('compositechess', None, None)694 def create_stages(self):695 x = [StereoSolutionSearchStage(self, 'StereoSolutionSearchStage'),696 StereoCalibrationStage(self, 'StereoCalibrationStage'),697 StereoCalibratorCalculatorStage(self, 'StereoCalibratorCalculatorStage')]698 self.add_stages(x)699 def on_completion(self):700 if Notifier.activeNotifier is not None:701 Notifier.activeNotifier.speak_message("completed")702 ds = self.stages['DistortionCalculatorStage']703 intrinsics = ds.output_containers['CAMERAINTRINSICS'].value # type: ScalarContainer704 reperr = ds.output_containers['REPROJECTIONERROR'] # type: ScalarContainer705 self.status_message("Reprojection error is {}".format(reperr.value))706 translated = {}707 for n, v in intrinsics.items():708 if isinstance(v, numpy.ndarray):709 translated[n] = IDEFProcess.serialize_matrix_to_json(v)710 else:711 translated[n] = v;712 translated['TIMESTAMP'] = datetime.datetime.now().isoformat()713 translated['ID'] = uuid.uuid4().hex714 imager = self.locals['IMAGER'].value715 translated['CONTROLLER'] = imager.controller.resource716 translated['CAMERAINDEX'] = imager.imager_address717 translated['MATCHCOUNT'] = self.locals['MATCHCOUNT'].value718 translated['MATCHSEPARATION'] = self.locals['MATCHSEPARATION'].value719 cfg = json.dumps(translated, ensure_ascii=False)720 sc = self.get_container('IMAGER').value721 sc.controller.publish_message(sc.imager_address, "intrinsics", cfg)722 filename = sc.calibration_filename()723 file = open(filename, "w")724 file.write(cfg)725 file.close()726 filename = "../CalibrationRecords/IntrinsicsHistory.txt"727 if os.path.isfile(filename):728 file = open(filename, "a")729 file.write(",\n")730 else:731 file = open(filename, "w")732 file.write(cfg)...
divisions.py
Source:divisions.py
...13def home():14 return redirect('/web/divisions')15@divisions_blueprint.route('//divisions', methods=['GET'])16def division_overview():17 x = requests.get(f'{get_container("matches")}/divisions')18 return render_template("divisions.html", divisions=x.json()["data"])19@divisions_blueprint.route('//login', methods=['GET', 'POST'])20def login():21 if request.method == 'POST':22 x = requests.post(f'{get_container("users")}/users/authenticate', data=request.form)23 if x.status_code == 200:24 user = x.json()['data']25 redirect_url = "/web/team_admin" if user["type"] == "user" else "/web/admin/divisions"26 resp = make_response(redirect(redirect_url))27 resp.set_cookie('team', str(user["team_id"]))28 resp.set_cookie('user_type', str(user["type"]))29 flash("Login succesful")30 return resp31 else:32 flash("Invalid credentials")33 return render_template("login.html")34@divisions_blueprint.route('//logout', methods=['GET', 'POST'])35def logout():36 resp = make_response(redirect('/web/divisions'))37 resp.set_cookie("team", '', expires=0)38 resp.set_cookie("user_type", '', expires=0)39 return resp40@divisions_blueprint.route('//team_admin', methods=['GET', 'POST'])41def team_admin():42 if request.cookies.get("user_type") not in ["user", "admin", "superadmin"]:43 return render_template("no_permission.html")44 team = request.cookies.get("team")45 if request.method == 'POST':46 # TODO update club info47 x = requests.put(f'{get_container("matches")}/matches/{request.form.get("id")}', data={"goals_home": request.form.get("goals_home"), "goals_away": request.form.get("goals_away")})48 if x.status_code == 200:49 flash("Score updated succesfuly")50 else:51 flash(f"Something went wrong ({x.status_code})")52 if team:53 x = requests.get(f'{get_container("matches")}/matches/home/{team}')54 club = requests.get(f'{get_container("clubs")}/teams/{team}').json()["data"]["club_id"]55 club_info = requests.get(f'{get_container("clubs")}/clubs/{club}').json()["data"]56 # return jsonify(x.status_code)57 x = x.json()["data"]58 return render_template("team_admin.html", matches=x, obj=club_info)59 else:60 return render_template("no_permission.html")61@divisions_blueprint.route('//teams', methods=['GET'])62def teams_overview():63 x = requests.get(f'{get_container("clubs")}/teams')64 # return jsonify(x.json())65 return render_template("teams.html", teams=x.json()["data"])66@divisions_blueprint.route('//teams/<team_id>', methods=['GET'])67def specific_team(team_id):68 x = requests.get(f'{get_container("clubs")}/teams/{team_id}').json()["data"]69 matches = requests.get(f'{get_container("matches")}/matches/recent/{team_id}')70 # return jsonify(matches)71 matches = matches.json()["data"]72 return render_template("team.html", team=x, matches=matches)73@divisions_blueprint.route('//divisions/<division_id>', methods=['GET'])74def specific_division(division_id):75 x = requests.get(f'{get_container("matches")}/divisions/{division_id}')76 team = request.args.get("team")77 if team:78 fixtures = requests.get(f'{get_container("matches")}/divisions/{division_id}/fixtures/{team}').json()79 else:80 fixtures = requests.get(f'{get_container("matches")}/divisions/{division_id}/fixtures').json()81 stats = requests.get(f'{get_container("matches")}/divisions/{division_id}/stats').json()["data"]82 league_table = requests.get(f'{get_container("matches")}/divisions/{division_id}/league_table').json()["data"]83 # return jsonify(x.json(), 200)84 return render_template("division.html", division=x.json()["data"]["name"], fixtures=fixtures["data"], stats=stats, league=league_table)85def get_weather(address, city, date):86 geolocator = Nominatim(user_agent="[PlaceHolder]", scheme='http')87 try:88 location = geolocator.geocode(f"{address}, {city}, Belgium")89 except GeocoderTimedOut:90 flash("The geolocator is timing out! please try again for weather")91 return {"temp": None, "hum": None, "report": None}92 # Credits groot deel van deze code: Grepper93 BASE_URL = "https://api.openweathermap.org/data/2.5/weather?"94 CITY = "Hyderabad"95 from project.api.api_key import API_KEY96 # upadting the URL97 URL = f'{BASE_URL}lat={location.latitude}&lon={location.longitude}&appid={API_KEY}&day={date.day}&month={date.month}&year={date.year}' # q={CITY},BE98 response = requests.get(URL)99 if response.status_code == 200:100 # getting data in the json format101 data = response.json()102 main = data['main']103 temperature = main['temp']104 humidity = main['humidity']105 report = data['weather']106 return {"temp": temperature, "hum": humidity, "report": report[0]['description']}107 else:108 # showing the error message109 # print("Error in the HTTP request")110 return {"temp": None, "hum": None, "report": None}111@divisions_blueprint.route('//matches/<fixture_number>', methods=['GET'])112def fixture_detail(fixture_number):113 x = requests.get(f'{get_container("matches")}/matches/{fixture_number}').json()["data"]114 home = x["home"]115 away = x["away"]116 date_ = x["date"]117 time = x["time"]118 referee = x["referee_id"]119 stats = None120 weather = None121 results_1 = None122 results_2 = None123 match_date = datetime.strptime(date_, "%Y-%m-%d")124 if match_date.date() >= datetime.now().date():125 stats = requests.get(f'{get_container("matches")}/matches/stats/{home}/vs/{away}').json()["data"]126 results_1 = ""127 for match in stats["team1"]["last"]:128 if match["goals_home"] is None or match["goals_away"] is None:129 results_1 += "?"130 continue131 if match["home"] == home:132 if match["goals_home"] > match["goals_away"]:133 results_1 += "W"134 elif match["goals_home"] == match["goals_away"]:135 results_1 += "T"136 elif match["goals_home"] < match["goals_away"]:137 results_1 += "L"138 elif match["away"] == home:139 if match["goals_home"] > match["goals_away"]:140 results_1 += "L"141 elif match["goals_home"] == match["goals_away"]:142 results_1 += "T"143 elif match["goals_home"] < match["goals_away"]:144 results_1 += "W"145 results_2 = ""146 for match in stats["team2"]["last"]:147 if match["goals_home"] is None or match["goals_away"] is None:148 results_2 += "?"149 continue150 if match["home"] == away:151 if match["goals_home"] > match["goals_away"]:152 results_2 += "W"153 elif match["goals_home"] == match["goals_away"]:154 results_2 += "T"155 elif match["goals_home"] < match["goals_away"]:156 results_2 += "L"157 elif match["away"] == away:158 if match["goals_home"] > match["goals_away"]:159 results_2 += "L"160 elif match["goals_home"] == match["goals_away"]:161 results_2 += "T"162 elif match["goals_home"] < match["goals_away"]:163 results_2 += "W"164 if match_date.date() < datetime.now().date() + timedelta(days=7):165 # find the city166 stamnr = requests.get(f'{get_container("clubs")}/teams/{home}').json()["data"]["club_id"]167 club = requests.get(f'{get_container("clubs")}/clubs/{stamnr}').json()["data"]168 address = club["address"]169 city = club["city"]170 weather = get_weather(address, city, match_date)171 return render_template("match.html", home=home, away=away, date=date_, time=time, referee=referee, stats=stats, results_1=results_1, results_2=results_2, weather=weather)172@divisions_blueprint.route('//admin/matches', methods=['GET', 'POST'])173def admin_matches():174 if request.cookies.get("user_type") not in ["admin", "superadmin"]:175 return render_template("no_permission.html")176 team = request.cookies.get("team")177 if request.method == 'POST':178 if request.form.get("delete") == "yes":179 x = requests.delete(f'{get_container("matches")}/matches/{request.form.get("id")}')180 if x.status_code == 200:181 flash("Deleted succesfuly")182 else:183 flash(f"Something went wrong ({x.status_code})")184 elif request.form.get("add") == "yes":185 values = request.form.to_dict()186 del values["add"]187 x = requests.post(f'{get_container("matches")}/matches', data=values)188 if x.status_code == 201:189 flash("Added succesfuly")190 else:191 flash(f"Something went wrong ({x.status_code})")192 else:193 # return jsonify(request.form)194 x = requests.put(f'{get_container("matches")}/matches/{request.form.get("id")}', data=request.form)195 if x.status_code == 200:196 flash("Updated succesfuly")197 else:198 flash(f"Something went wrong ({x.status_code}). Be sure the referee isn't double booked and all data entered is in the correct format.")199 if team:200 matchweek = request.args.get("week")201 if matchweek:202 x = requests.get(f'{get_container("matches")}/matches/week/{matchweek}')203 else:204 x = requests.get(f'{get_container("matches")}/matches')205 # return jsonify(x.json())206 x = x.json()["data"]207 return render_template("admin_matches.html", matches=x, obj_attr=["division_id", "matchweek", "date", "time", "home", "away"])208 else:209 return render_template("no_permission.html")210@divisions_blueprint.route('//admin/divisions', methods=['GET', 'POST'])211def admin_divisions():212 if request.cookies.get("user_type") not in ["admin", "superadmin"]:213 return render_template("no_permission.html")214 team = request.cookies.get("team")215 if request.method == 'POST':216 if request.form.get("delete") == "yes":217 x = requests.delete(f'{get_container("matches")}/divisions/{request.form.get("id")}')218 if x.status_code == 200:219 flash("Deleted succesfuly")220 else:221 flash(f"Something went wrong ({x.status_code})")222 elif request.form.get("add") == "yes":223 values = request.form.to_dict()224 del values["add"]225 x = requests.post(f'{get_container("matches")}/divisions', data=values)226 if x.status_code == 201:227 flash("Added succesfuly")228 else:229 flash(f"Something went wrong ({x.status_code})")230 else:231 # return jsonify(request.form)232 x = requests.put(f'{get_container("matches")}/divisions/{request.form.get("id")}', data=request.form)233 if x.status_code == 200:234 flash("Updated succesfuly")235 else:236 flash(f"Something went wrong ({x.status_code})")237 if team:238 x = requests.get(f'{get_container("matches")}/divisions')239 # return jsonify(x.json())240 x = x.json()["data"]241 return render_template("admin_divisions.html", objects=x, obj_attr=["name"])242 else:243 return render_template("no_permission.html")244@divisions_blueprint.route('//admin/referees', methods=['GET', 'POST'])245def admin_referees():246 if request.cookies.get("user_type") not in ["admin", "superadmin"]:247 return render_template("no_permission.html")248 team = request.cookies.get("team")249 if request.method == 'POST':250 if request.form.get("delete") == "yes":251 x = requests.delete(f'{get_container("matches")}/referees/{request.form.get("id")}')252 if x.status_code == 200:253 flash("Deleted succesfuly")254 else:255 flash(f"Something went wrong ({x.status_code})")256 elif request.form.get("add") == "yes":257 values = request.form.to_dict()258 del values["add"]259 x = requests.post(f'{get_container("matches")}/referees', data=values)260 if x.status_code == 201:261 flash("Added succesfuly")262 else:263 flash(f"Something went wrong ({x.status_code})")264 else:265 # return jsonify(request.form)266 x = requests.put(f'{get_container("matches")}/referees/{request.form.get("id")}', data=request.form)267 if x.status_code == 200:268 flash("Updated succesfuly")269 else:270 flash(f"Something went wrong ({x.status_code})")271 if team:272 x = requests.get(f'{get_container("matches")}/referees')273 # return jsonify(x.json())274 x = x.json()["data"]275 return render_template("admin_referees.html", objects=x, obj_attr=["firstname", "lastname", "birthday"])276 else:277 return render_template("no_permission.html")278@divisions_blueprint.route('//admin/clubs', methods=['GET', 'POST'])279def admin_clubs():280 if request.cookies.get("user_type") not in ["admin", "superadmin"]:281 return render_template("no_permission.html")282 team = request.cookies.get("team")283 if request.method == 'POST':284 if request.form.get("delete") == "yes":285 x = requests.delete(f'{get_container("clubs")}/clubs/{request.form.get("stam_number")}')286 if x.status_code == 200:287 flash("Deleted succesfuly")288 else:289 flash(f"Something went wrong ({x.status_code})")290 elif request.form.get("add") == "yes":291 values = request.form.to_dict()292 del values["add"]293 x = requests.post(f'{get_container("clubs")}/clubs', data=values)294 if x.status_code == 201:295 flash("Added succesfuly")296 else:297 flash(f"Something went wrong ({x.status_code})")298 else:299 # return jsonify(request.form)300 x = requests.put(f'{get_container("clubs")}/clubs/{request.form.get("stam_number")}', data=request.form)301 if x.status_code == 200:302 flash("Updated succesfuly")303 else:304 flash(f"Something went wrong ({x.status_code})")305 if team:306 x = requests.get(f'{get_container("clubs")}/clubs')307 # return jsonify(x.json())308 x = x.json()["data"]309 return render_template("admin_clubs.html", objects=x, obj_attr=["name", "address", "zip", "city"])310 else:311 return render_template("no_permission.html")312@divisions_blueprint.route('//admin/teams', methods=['GET', 'POST'])313def admin_teams():314 if request.cookies.get("user_type") not in ["admin", "superadmin"]:315 return render_template("no_permission.html")316 team = request.cookies.get("team")317 if request.method == 'POST':318 if request.form.get("delete") == "yes":319 x = requests.delete(f'{get_container("clubs")}/teams/{request.form.get("id")}')320 if x.status_code == 200:321 flash("Deleted succesfuly")322 else:323 flash(f"Something went wrong ({x.status_code})")324 elif request.form.get("add") == "yes":325 values = request.form.to_dict()326 del values["add"]327 x = requests.post(f'{get_container("clubs")}/teams', data=values)328 if x.status_code == 201:329 flash("Added succesfuly")330 else:331 flash(f"Something went wrong ({x.status_code})")332 else:333 # return jsonify(request.form)334 x = requests.put(f'{get_container("clubs")}/teams/{request.form.get("id")}', data=request.form)335 if x.status_code == 200:336 flash("Updated succesfuly")337 else:338 flash(f"Something went wrong ({x.status_code})")339 if team:340 x = requests.get(f'{get_container("clubs")}/teams')341 # return jsonify(x.json())342 x = x.json()["data"]343 return render_template("admin_teams.html", objects=x, obj_attr=["club_id"])344 else:345 return render_template("no_permission.html")346@divisions_blueprint.route('//admin/users', methods=['GET', 'POST'])347def admin_users():348 if request.cookies.get("user_type") not in ["admin", "superadmin"]:349 return render_template("no_permission.html")350 team = request.cookies.get("team")351 if request.method == 'POST':352 if request.form.get("delete") == "yes":353 x = requests.delete(f'{get_container("users")}/users/{request.form.get("id")}')354 if x.status_code == 200:355 flash("Deleted succesfuly")356 else:357 flash(f"Something went wrong ({x.status_code})")358 elif request.form.get("add") == "yes":359 values = request.form.to_dict()360 del values["add"]361 x = requests.post(f'{get_container("users")}/users', data=values)362 if x.status_code == 201:363 flash("Added succesfuly")364 else:365 flash(f"Something went wrong ({x.status_code})")366 else:367 # return jsonify(request.form)368 x = requests.put(f'{get_container("clubs")}/teams/{request.form.get("id")}', data=request.form)369 if x.status_code == 200:370 flash("Updated succesfuly")371 else:372 flash(f"Something went wrong ({x.status_code})")373 if team:374 x = requests.get(f'{get_container("users")}/users')375 # return jsonify(x.json())376 x = x.json()["data"]377 return render_template("admin_users.html", objects=x, obj_attr=["username"], superadmin=request.cookies.get("user_type")=="superadmin")378 else:...
test_rackspace_uploader.py
Source:test_rackspace_uploader.py
...72 u.init_app(self.flask_app)73 file = FileStorage(filename='test.jpg')74 err_msg = "Upload file should return True"75 assert u.upload_file(file, container='user_3') is True, err_msg76 calls = [call.get_container('user_3'),77 call.get_container().get_object('test.jpg')]78 mycf.assert_has_calls(calls, any_order=True)79 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',80 return_value=True)81 @patch('pybossa.uploader.rackspace.pyrax.utils.get_checksum',82 return_value="1234abcd")83 def test_rackspace_uploader_upload_correct_purgin_first_file(self, mock, mock2):84 """Test RACKSPACE UPLOADER upload file purging first file works."""85 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:86 mycf.upload_file.return_value=True87 mycf.get_object.side_effect = True88 u = RackspaceUploader()89 u.init_app(self.flask_app)90 file = FileStorage(filename='test.jpg')91 err_msg = "Upload file should return True"92 assert u.upload_file(file, container='user_3') is True, err_msg93 calls = [call.get_container('user_3'),94 call.get_container().get_object().delete(),95 call.get_container().get_object('test.jpg')]96 print mycf.mock_calls97 mycf.assert_has_calls(calls, any_order=True)98 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',99 return_value=True)100 @patch('pybossa.uploader.rackspace.pyrax.utils.get_checksum',101 return_value="1234abcd")102 def test_rackspace_uploader_upload_file_fails(self, mock, mock2):103 """Test RACKSPACE UPLOADER upload file fail works."""104 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:105 from pyrax.exceptions import UploadFailed106 mycf.upload_file.side_effect = UploadFailed107 u = RackspaceUploader()108 u.init_app(self.flask_app)109 file = FileStorage(filename='test.jpg')110 err_msg = "Upload file should return False"111 assert u.upload_file(file, container='user_3') is False, err_msg112 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',113 return_value=True)114 @patch('pybossa.uploader.rackspace.pyrax.utils.get_checksum',115 return_value="1234abcd")116 def test_rackspace_uploader_upload_file_object_fails(self, mock, mock2):117 """Test RACKSPACE UPLOADER upload file object fail works."""118 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:119 from pyrax.exceptions import NoSuchObject120 container = MagicMock()121 container.get_object.side_effect = NoSuchObject122 mycf.get_container.return_value = container123 u = RackspaceUploader()124 u.init_app(self.flask_app)125 file = FileStorage(filename='test.jpg')126 err_msg = "Upload file should return True"127 assert u.upload_file(file, container='user_3') is True, err_msg128 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',129 return_value=True)130 @patch('pybossa.uploader.rackspace.pyrax.utils.get_checksum',131 return_value="1234abcd")132 def test_rackspace_uploader_upload_wrong_file(self, mock, mock2):133 """Test RACKSPACE UPLOADER upload wrong file extension works."""134 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:135 mycf.upload_file.return_value = True136 u = RackspaceUploader()137 u.init_app(self.flask_app)138 file = FileStorage(filename='test.docs')139 err_msg = "Upload file should return False"140 res = u.upload_file(file, container='user_3')141 assert res is False, err_msg142 @with_context143 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',144 return_value=True)145 @patch('pybossa.uploader.rackspace.url_for', return_value='/static/img/placeholder.user.png')146 def test_rackspace_uploader_lookup_url(self, mock1, mock2):147 """Test RACKSPACE UPLOADER lookup returns a valid link."""148 uri = 'https://rackspace.com'149 filename = 'test.jpg'150 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:151 cdn_enabled_mock = PropertyMock(return_value=True)152 type(fake_container).cdn_enabled = cdn_enabled_mock153 mycf.get_container.return_value = fake_container154 u = RackspaceUploader()155 u.init_app(self.flask_app)156 res = u._lookup_url('rackspace', {'filename': filename,157 'container': 'user_3'})158 expected_url = "%s/%s" % (uri, filename)159 err_msg = "We should get the following URL: %s" % expected_url160 assert res == expected_url, err_msg161 @with_context162 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',163 return_value=True)164 @patch('pybossa.uploader.rackspace.url_for', return_value='/static/img/placeholder.user.png')165 def test_rackspace_uploader_lookup_url_enable_cdn(self, mock1, mock2):166 """Test RACKSPACE UPLOADER lookup enables CDN for non enabled CDN."""167 filename = 'test.jpg'168 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:169 cdn_enabled_mock = PropertyMock(return_value=False)170 type(fake_container).cdn_enabled = cdn_enabled_mock171 mycf.get_container.return_value = fake_container172 u = RackspaceUploader()173 u.init_app(self.flask_app)174 res = u._lookup_url('rackspace', {'filename': filename,175 'container': 'user_3'})176 url = 'https://rackspace.com/test.jpg'177 err_msg = "We should get the %s but we got %s " % (url, res)178 assert res == url, err_msg179 calls = [call.make_public()]180 fake_container.assert_has_calls(calls, any_order=True)181 @with_context182 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',183 return_value=True)184 def test_rackspace_uploader_lookup_url_returns_failover_url(self, mock):185 """Test RACKSPACE UPLOADER lookup returns failover_url for user avatar."""186 filename = 'test_avatar.jpg'187 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:188 cdn_enabled_mock = PropertyMock(return_value=False)189 type(fake_container).cdn_enabled = cdn_enabled_mock190 mycf.get_container.return_value = fake_container191 fake_container.make_public.side_effect = NoSuchObject192 u = RackspaceUploader()193 u.init_app(self.flask_app)194 res = u._lookup_url('rackspace', {'filename': filename,195 'container': 'user_3'})196 failover_url = 'http://localhost/static/img/placeholder.user.png'197 err_msg = "We should get the %s but we got %s " % (failover_url, res)198 assert res == failover_url, err_msg199 @with_context200 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',201 return_value=True)202 def test_rackspace_uploader_lookup_url_returns_failover_url_project(self, mock):203 """Test RACKSPACE UPLOADER lookup returns failover_url for project avatar."""204 filename = 'project_32.jpg'205 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:206 cdn_enabled_mock = PropertyMock(return_value=False)207 type(fake_container).cdn_enabled = cdn_enabled_mock208 mycf.get_container.return_value = fake_container209 fake_container.make_public.side_effect = NoSuchObject210 u = RackspaceUploader()211 u.init_app(self.flask_app)212 res = u._lookup_url('rackspace', {'filename': filename,213 'container': 'user_3'})214 failover_url = 'http://localhost/static/img/placeholder.project.png'215 err_msg = "We should get the %s but we got %s " % (failover_url, res)216 assert res == failover_url, err_msg217 @with_context218 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',219 return_value=True)220 def test_rackspace_uploader_lookup_url_returns_failover_url_project_backwards_compat(self, mock):221 """Test RACKSPACE UPLOADER lookup returns failover_url for project222 avatar for old project avatars named with 'app'."""223 filename = 'app_32.jpg'224 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:225 cdn_enabled_mock = PropertyMock(return_value=False)226 type(fake_container).cdn_enabled = cdn_enabled_mock227 mycf.get_container.return_value = fake_container228 fake_container.make_public.side_effect = NoSuchObject229 u = RackspaceUploader()230 u.init_app(self.flask_app)231 res = u._lookup_url('rackspace', {'filename': filename,232 'container': 'user_3'})233 failover_url = 'http://localhost/static/img/placeholder.project.png'234 err_msg = "We should get the %s but we got %s " % (failover_url, res)235 assert res == failover_url, err_msg236 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',237 return_value=True)238 def test_rackspace_uploader_get_container(self, mock1):239 """Test RACKSPACE UPLOADER get_container method works."""240 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:241 cdn_enabled_mock = PropertyMock(return_value=False)242 type(fake_container).cdn_enabled = cdn_enabled_mock243 mycf.get_container.side_effect = NoSuchContainer244 calls = [call.get_container('user_3'),245 call.create_container('user_3'),246 call.make_container_public('user_3')247 ]248 u = RackspaceUploader()249 u.init_app(self.flask_app)250 assert u.get_container('user_3')251 mycf.assert_has_calls(calls, any_order=True)252 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',253 return_value=True)254 def test_rackspace_uploader_delete(self, mock1):255 """Test RACKSPACE UPLOADER delete method works."""256 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:257 calls = [call.get_container('container'),258 call.get_container().get_object('file'),259 call.get_container().get_object().delete()260 ]261 u = RackspaceUploader()262 u.init_app(self.flask_app)263 err_msg = "It should return True"264 assert u.delete_file('file', 'container') is True, err_msg265 mycf.assert_has_calls(calls, any_order=True)266 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',267 return_value=True)268 def test_rackspace_uploader_delete_fails(self, mock1):269 """Test RACKSPACE UPLOADER delete fails method works."""270 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:271 container = MagicMock()272 container.get_object.side_effect = NoSuchObject273 mycf.get_container.return_value = container274 calls = [call.get_container('container')]275 u = RackspaceUploader()276 u.init_app(self.flask_app)277 err_msg = "It should return False"278 assert u.delete_file('file', 'container') is False, err_msg279 mycf.assert_has_calls(calls, any_order=True)280 @patch('pybossa.uploader.rackspace.pyrax.set_credentials',281 return_value=True)282 def test_file_exists_for_missing_file(self, credentials):283 """Test RACKSPACE UPLOADER file_exists returns False if the file does not exist"""284 with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf:285 u = RackspaceUploader()286 u.init_app(self.flask_app)287 container = MagicMock()288 container.get_object.side_effect = NoSuchObject...
find-the-bug--returning-the-container.py
Source:find-the-bug--returning-the-container.py
1def get_container(product):2 matches = {3 "Bread" : "bag",4 "Milk" : "bottle",5 "Beer" : "bottle",6 "Eggs" : "carton",7 "Cerials" : "box",8 "Candy" : "plastic",9 "Cheese" : None10 }11 return matches[product]12get_container("Bread") # "bag"13get_container("Milk") # "bottle"14get_container("Beer") # "bottle"15get_container("Eggs") # "carton"16get_container("Candy") # "plastic"...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!