Best Python code snippet using robotframework-anywherelibrary
smit_deploy.py
Source:smit_deploy.py
...187 os.chdir('casetup')188 ca = casetup.deployca.DeployCA()189 if args.noexten:190 args.extensions = [' ']191 ca.init_config(package=args.package[0], CN=get_value(args.CN), C=get_value(args.C), O=get_value(args.O),192 L=get_value(args.L),193 ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),194 SELFSIGN='y', CAPATH=get_value(args.capath),195 WORKPATH=get_value(args.capath), SK=get_value(args.sk), CSR=get_value(args.csr),196 ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),197 CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),198 SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),199 MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),200 OPENSSL_PATH=get_value(args.opensslpath))201 ca.setup(args.all)202 elif args.package[0] == 'p43':203 if args.new: # Create a new OCSP server.204 os.chdir('casetup')205 ocsp_deploy = casetup.deployca.DeployCA()206 if args.noexten:207 args.extensions = [' ']208 ocsp_deploy.init_config(package=args.package[0], CN=get_value(args.CN), C=get_value(args.C),209 O=get_value(args.O), L=get_value(args.L),210 ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),211 SELFSIGN='n', CAPATH=get_value(args.capath),212 WORKPATH=get_value(args.workpath), SK=get_value(args.sk), CSR=get_value(args.csr),213 ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),214 CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),215 SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),216 OCSPPORT=get_value(args.ocspport),217 MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),218 OPENSSL_PATH=get_value(args.opensslpath))219 ocsp_deploy.ocsp_setup()220 os.chdir('../appca')221 else: # Run existing OCSP server.222 os.chdir('appca')223 app = appca.installca.AppCA()224 app.build()225 ocsp = appca.ocsp.OCSP()226 if not args.config or args.config[0] == '':227 args.config = ['ocspcnf']228 ocsp.init_config(config=args.config[0], OCSPSK=get_value(args.sk), OCSPCERT=get_value(args.certpath),229 CERTDB=get_value(args.certdb), CACERT=get_value(args.cacert),230 OCSPPORT=get_value(args.ocspport))231 ocsp.start()232 elif args.package[0] == 'p44':233 os.chdir('security')234 cert = security.certmngr.CertManager()235 # ca = casetup.deployca.DeployCA()236 if args.noexten:237 args.extensions = [' ']238 if not args.config or args.config[0] == '':239 args.config = ['certcnf']240 cert.init_config(config=args.config[0], package=args.package[0], CN=get_value(args.CN), C=get_value(args.C),241 O=get_value(args.O), L=get_value(args.L),242 ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),243 SELFSIGN='n', CAPATH=get_value(args.capath),244 WORKPATH=get_value(args.workpath), SK=get_value(args.sk), CSR=get_value(args.csr),245 ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),246 CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),247 SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),248 MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),249 OPENSSL_PATH=get_value(args.opensslpath))250 cert.create_csr()251 elif args.package[0] == 'p45':252 os.chdir('security')253 cert = security.certmngr.CertManager()254 # ca = casetup.deployca.DeployCA()255 if args.noexten:256 args.extensions = [' ']257 if not args.config or args.config[0] == '':258 args.config = ['certcnf']259 cert.init_config(config=args.config[0], package=args.package[0], CN=get_value(args.CN),260 C=get_value(args.C), O=get_value(args.O), L=get_value(args.L),261 ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),262 SELFSIGN='n', CAPATH=get_value(args.capath),263 WORKPATH=get_value(args.workpath), SK=get_value(args.sk), CSR=get_value(args.csr),264 ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),265 CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),266 SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),267 MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),268 OPENSSL_PATH=get_value(args.opensslpath))269 cert.gen_sig()270 elif args.package[0] == 'p46':271 os.chdir('security')272 cert = security.certmngr.CertManager()273 # ca = casetup.deployca.DeployCA()274 if args.noexten:275 args.extensions = [' ']276 if not args.config or args.config[0] == '':277 args.config = ['certcnf']278 cert.init_config(config=args.config[0], package=args.package[0], CN=get_value(args.CN), C=get_value(args.C),279 O=get_value(args.O), L=get_value(args.L),280 ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),281 SELFSIGN='n', CAPATH=get_value(args.capath),282 WORKPATH=get_value(args.workpath), SK=get_value(args.sk), CSR=get_value(args.csr),283 ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),284 CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),285 SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),286 MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),287 OPENSSL_PATH=get_value(args.opensslpath))288 cert.create_cert()289 elif args.package[0] == 'p5':290 if args.new: # create new private CA.291 os.chdir('casetup')292 cadeploy = casetup.deployca.DeployCA()293 if args.noexten:294 args.extensions = [' ']295 cadeploy.init_config(package=args.package[0], CN=get_value(args.CN), C=get_value(args.C),296 O=get_value(args.O),297 L=get_value(args.L),298 ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),299 SELFSIGN='y', CAPATH=get_value(args.capath),300 WORKPATH=get_value(args.capath), SK=get_value(args.sk), CSR=get_value(args.csr),301 ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),302 CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),303 SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),304 OCSPPORT=get_value(args.ocspport),305 MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),306 OPENSSL_PATH=get_value(args.opensslpath))307 cadeploy.setup(args.all)308 os.chdir('../appca')309 else:310 os.chdir('appca')311 ca = appca.ca.CA()312 if not args.config or args.config[0] == '':313 args.config = ['appcacnf']314 ca.init_config(config=args.config[0], OCSPSK=get_value(args.sk), OCSPCERT=get_value(args.certpath),315 CACERT=get_value(args.cacert), OCSPPORT=get_value(args.ocspport),316 CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp), IP=get_value(args.caip),317 PORT=get_value(args.caport), SIGCERT=get_value(args.sigcert), SIGCHAIN=get_value(args.sigchain),318 SELFSIGN='n', OPENSSL_PATH=get_value(args.opensslpath))319 ca.start()320 elif args.package[0] == 'p6':321 os.chdir('appserver')322 server = appserver.server.Server()323 if not args.config or args.config[0] == '':324 args.config = ['servercnf']325 server.init_config(config=args.config[0], CN=get_value(args.CN), C=get_value(args.C), O=get_value(args.O),326 L=get_value(args.L), ST=get_value(args.ST), OU=get_value(args.OU),327 emailAddress=get_value(args.email), SK=get_value(args.sk), CSR=get_value(args.csr),328 CERT=get_value(args.certpath), MSG=get_value(args.msg), CACERT=get_value(args.cacert),329 SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), CAIP=get_value(args.caip),330 CAPORT=get_value(args.caport), SERVERIP=get_value(args.serverip),331 SERVERPORT=get_value(args.serverport), TYPE='server', CERT_REQS='CERT_REQUIRED')332 server.start(args.all)333 elif args.package[0] == 'p7':334 os.chdir('appclient')335 client = appclient.client.Client()336 if not args.config or args.config[0] == '':337 args.config = ['clientcnf']338 client.init_config(config=args.config[0], CN=get_value(args.CN), C=get_value(args.C), O=get_value(args.O),339 L=get_value(args.L), ST=get_value(args.ST), OU=get_value(args.OU),340 emailAddress=get_value(args.email), SK=get_value(args.sk), CSR=get_value(args.csr),341 CERT=get_value(args.certpath), MSG=get_value(args.msg), CACERT=get_value(args.cacert),342 SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), CAIP=get_value(args.caip),343 CAPORT=get_value(args.caport), SERVERIP=get_value(args.serverip),344 SERVERPORT=get_value(args.serverport), TYPE='client', CERT_REQS='CERT_REQUIRED')345 client.start(args.all)346 elif args.package[0] == 'p81':347 os.chdir('testbed')348 if args.exp_config is not None:349 if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':350 print ('Error: you should spcified two configuration files for the testbed configuration.\n'351 'E.g.,: -exp_config client_config_file sink_config_file'352 '\nNote: order is important, you should specify client configuration then sink configuration.')353 return354 else:355 args.exp_config = ['client_expcnf', 'sink_expcnf']356 exp = testbed.setupexp.SetupExp()357 exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),358 CERT=get_value(args.certpath), CSR=get_value(args.csr),359 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),360 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),361 CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),362 SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),363 SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),364 DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),365 SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')366 exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),367 CERT=get_value(args.certpath), CSR=get_value(args.csr),368 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),369 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),370 CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),371 CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),372 DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),373 ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),374 PASSWORD=get_value(args.passwd), USER=get_value(args.user),375 SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),376 TYPE='server')377 exp.setup()378 elif args.package[0] == 'p82':379 os.chdir('testbed')380 if args.exp_config is not None:381 if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':382 print ('Error: you should spcified two configuration files for the testbed configuration. \n'383 'E.g.,: -exp_config client_config_file sink_config_file'384 '\nNote: order is important, you should specify client configuration then sink configuration.')385 return386 else:387 args.exp_config = ['client_expcnf', 'sink_expcnf']388 exp = testbed.setupexp.SetupExp()389 exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),390 CERT=get_value(args.certpath), CSR=get_value(args.csr),391 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),392 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),393 CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),394 SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),395 SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),396 DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),397 SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')398 exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),399 CERT=get_value(args.certpath), CSR=get_value(args.csr),400 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),401 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),402 CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),403 CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),404 DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),405 ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),406 PASSWORD=get_value(args.passwd), USER=get_value(args.user),407 SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),408 TYPE='server')409 exp.start_sink(dtls=args.dtls)410 elif args.package[0] == 'p83':411 os.chdir('testbed')412 if args.exp_config is not None:413 if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':414 print ('Error: you should spcified two configuration files for the testbed configuration. \n'415 'E.g.,: -exp_config client_config_file sink_config_file'416 '\nNote: order is important, you should specify client configuration then sink configuration.')417 return418 else:419 args.exp_config = ['client_expcnf', 'sink_expcnf']420 exp = testbed.setupexp.SetupExp()421 exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),422 CERT=get_value(args.certpath), CSR=get_value(args.csr),423 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),424 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),425 CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),426 SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),427 SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),428 DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),429 SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')430 exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),431 CERT=get_value(args.certpath), CSR=get_value(args.csr),432 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),433 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),434 CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),435 CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),436 DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),437 ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),438 PASSWORD=get_value(args.passwd), USER=get_value(args.user),439 SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),440 TYPE='server')441 exp.start_router()442 elif args.package[0] == 'p84': # start client443 os.chdir('testbed')444 if args.exp_config is not None:445 if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':446 print ('Error: you should spcified two configuration files for the testbed configuration.\n'447 'E.g.,: -exp_config client_config_file sink_config_file'448 '\nNote: order is important, you should specify client configuration then sink configuration.')449 return450 else:451 args.exp_config = ['client_expcnf', 'sink_expcnf']452 if args.inet is None:453 args.inet = ['6']454 elif args.inet[0] != '4' and args.inet[0] != '6':455 print ("Error: invalid parameter of \'inet\'. The value should be eigher 4 or 6")456 return457 exp = testbed.setupexp.SetupExp()458 exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),459 CERT=get_value(args.certpath), CSR=get_value(args.csr),460 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),461 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),462 CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),463 SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),464 SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),465 DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),466 SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')467 exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),468 CERT=get_value(args.certpath), CSR=get_value(args.csr),469 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),470 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),471 CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),472 CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),473 DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),474 ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),475 PASSWORD=get_value(args.passwd), USER=get_value(args.user),476 SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),477 TYPE='server')478 if args.update_client:479 exp.upload_to_clients(args.inet[0])480 if args.nostart is None or args.nostart != True:481 exp.start_clients()482 elif args.package[0] == 'p85': # collect data483 os.chdir('testbed')484 if args.exp_config is not None:485 if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':486 print ('Error: you should spcified two configuration files for the testbed configuration. \n'487 'E.g.,: -exp_config client_config_file sink_config_file'488 '\nNote: order is important, you should specify client configuration then sink configuration.')489 return490 else:491 args.exp_config = ['client_expcnf', 'sink_expcnf']492 if args.inet is None:493 args.inet = ['6']494 elif args.inet[0] != '4' and args.inet[0] != '6':495 print ("Error: invalid parameter of \'inet\'. The value should be eigher 4 or 6")496 return497 exp = testbed.setupexp.SetupExp()498 exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),499 CERT=get_value(args.certpath), CSR=get_value(args.csr),500 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),501 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),502 CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),503 SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),504 SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),505 DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),506 SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')507 exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),508 CERT=get_value(args.certpath), CSR=get_value(args.csr),509 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),510 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),511 CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),512 CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),513 DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),514 ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),515 PASSWORD=get_value(args.passwd), USER=get_value(args.user),516 SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),517 TYPE='server')518 exp.collect_data(args.inet[0])519 elif args.package[0] == 'p86': # analyze data520 os.chdir('testbed')521 if args.exp_config is not None:522 if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':523 print ('Error: you should spcified two configuration files for the testbed configuration. \n'524 'E.g.,: -exp_config client_config_file sink_config_file'525 '\nNote: order is important, you should specify client configuration then sink configuration.')526 return527 else:528 args.exp_config = ['client_expcnf', 'sink_expcnf']529 exp = testbed.setupexp.SetupExp()530 exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),531 CERT=get_value(args.certpath), CSR=get_value(args.csr),532 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),533 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),534 CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),535 SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),536 SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),537 DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),538 SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')539 exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),540 CERT=get_value(args.certpath), CSR=get_value(args.csr),541 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),542 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),543 CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),544 CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),545 DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),546 ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),547 PASSWORD=get_value(args.passwd), USER=get_value(args.user),548 SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),549 TYPE='server')550 exp.analyze()551 elif args.package[0] == 'p87': # stop experiment552 os.chdir('testbed')553 if args.exp_config is not None:554 if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':555 print ('Error: you should spcified two configuration files for the testbed configuration. \n'556 'E.g.,: -exp_config client_config_file sink_config_file'557 '\nNote: order is important, you should specify client configuration then sink configuration.')558 return559 else:560 args.exp_config = ['client_expcnf', 'sink_expcnf']561 if args.inet is not None:562 args.inet = ['6']563 elif args.inet[0] != '4' and args.inet[0] != '6':564 print ("Error: invalid parameter of \'inet\'. The value should be eigher 4 or 6")565 return566 exp = testbed.setupexp.SetupExp()567 exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),568 CERT=get_value(args.certpath), CSR=get_value(args.csr),569 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),570 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),571 CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),572 SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),573 SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),574 DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),575 SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')576 exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),577 CERT=get_value(args.certpath), CSR=get_value(args.csr),578 CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),579 SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),580 CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),581 CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),582 DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),583 ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),584 PASSWORD=get_value(args.passwd), USER=get_value(args.user),585 SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),586 TYPE='server')587 exp.stop(args.inet[0])588 else:589 print('Invalid argument.')590def get_value(inlist):591 if inlist:592 return inlist[0]593 else:594 return None595if __name__ == '__main__':...
SATEncodingTest.py
Source:SATEncodingTest.py
...18 m = Model(v1 == 3)19 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)20 s.solve()21 self.assertTrue(s.is_sat())22 self.assertEqual(v1.get_value(), 3)23 def testEq2(self):24 v1 = Variable(1, 5)25 v2 = Variable(3, 8)26 m = Model(v1 == v2)27 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)28 s.solve()29 self.assertTrue(s.is_sat())30 self.assertEqual(v1.get_value(), v2.get_value())31 def testEq3(self):32 v1 = Variable(1, 4)33 v2 = Variable(4, 6)34 m = Model(v1 == v2)35 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)36 s.solve()37 self.assertTrue(s.is_sat())38 self.assertEqual(v1.get_value(), v2.get_value())39 def testEqGap(self):40 v1 = Variable([1, 3, 5, 7])41 v2 = Variable(4, 8)42 m = Model(v1 == v2)43 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)44 s.solve()45 self.assertTrue(s.is_sat())46 self.assertEqual(v1.get_value(), v2.get_value())47 def testSmallDomainEq(self):48 v1 = Variable(2)49 m = Model(v1 == 1)50 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)51 s.solve()52 self.assertTrue(s.is_sat())53 self.assertEqual(v1.get_value(), 1)54 def testSmallDomainLt(self):55 v1 = Variable(2)56 m = Model(v1 < 1)57 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)58 s.solve()59 self.assertTrue(s.is_sat())60 self.assertLess(v1.get_value(), 1)61 def testSmallDomainLe(self):62 v1 = Variable(2)63 m = Model(v1 <= 1)64 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)65 s.solve()66 self.assertTrue(s.is_sat())67 self.assertLessEqual(v1.get_value(), 1)68 def testSmallDomainGt(self):69 v1 = Variable(2)70 m = Model(v1 > 0)71 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)72 s.solve()73 self.assertTrue(s.is_sat())74 self.assertEqual(v1.get_value(), 1)75 def testSmallDomainGe(self):76 v1 = Variable(2)77 m = Model(v1 >= 1)78 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)79 s.solve()80 self.assertTrue(s.is_sat())81 self.assertEqual(v1.get_value(), 1)82 # def testConstantDomain(self):83 # v1 = Variable(1, 4)84 # v2 = Variable([2])85 # m = Model(v1 == v2)86 # s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)87 # s.solve()88 # self.assertTrue(s.is_sat())89 # self.assertEqual(v1.get_value(), v2.get_value())90 def testOffsetDomain(self):91 v1 = Variable(1, 4)92 v2 = Variable(1, 5)93 m = Model(v1 + 3 == v2)94 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)95 s.solve()96 self.assertTrue(s.is_sat())97 self.assertEqual(v1.get_value() + 3, v2.get_value())98 def testNe1(self):99 v1 = Variable(1, 2)100 m = Model(v1 != 1)101 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)102 s.solve()103 self.assertTrue(s.is_sat())104 self.assertNotEqual(v1.get_value(), 1)105 def testNe2(self):106 v1 = Variable(1, 2)107 v2 = Variable(1, 2)108 m = Model(v1 != v2)109 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)110 s.solve()111 self.assertTrue(s.is_sat())112 self.assertNotEqual(v1.get_value(), v2.get_value())113 def testGt(self):114 v1 = Variable(1, 3)115 v2 = Variable(1, 3)116 m = Model(v1 > v2)117 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)118 s.solve()119 self.assertTrue(s.is_sat())120 self.assertGreater(v1.get_value(), v2.get_value())121 def testGe(self):122 v1 = Variable(1, 3)123 v2 = Variable(1, 3)124 m = Model(v1 >= v2)125 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)126 s.solve()127 self.assertTrue(s.is_sat())128 self.assertGreaterEqual(v1.get_value(), v2.get_value())129 def testLt(self):130 v1 = Variable(1, 3)131 v2 = Variable(1, 3)132 m = Model(v1 < v2)133 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)134 s.solve()135 self.assertTrue(s.is_sat())136 self.assertLess(v1.get_value(), v2.get_value())137 def testLe(self):138 v1 = Variable(1, 3)139 v2 = Variable(1, 3)140 m = Model(v1 <= v2)141 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)142 s.solve()143 self.assertTrue(s.is_sat())144 self.assertLessEqual(v1.get_value(), v2.get_value())145 def testLtGap(self):146 v1 = Variable([1, 3, 5])147 v2 = Variable([3])148 m = Model(v1 < v2)149 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)150 s.solve()151 self.assertTrue(s.is_sat())152 self.assertLess(v1.get_value(), v2.get_value())153 def testAddEqConstant(self):154 v1 = Variable(5)155 v2 = Variable(5)156 m = Model(v1 + v2 == 6)157 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)158 s.solve()159 self.assertTrue(s.is_sat())160 self.assertEqual(v1.get_value() + v2.get_value(), 6)161 def testAddEq(self):162 v1 = Variable(5)163 v2 = Variable(5)164 v3 = Variable(5)165 m = Model(v1 + v2 == v3)166 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)167 s.solve()168 self.assertTrue(s.is_sat())169 self.assertEqual(v1.get_value() + v2.get_value(), v3.get_value())170 def testAddGapsSat(self):171 v1 = Variable([0, 4, 8])172 v2 = Variable()173 v3 = Variable(10)174 m = Model(v1 + v2 == v3, v3 == 5)175 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)176 s.solve()177 self.assertTrue(s.is_sat())178 self.assertEqual(v1.get_value() + v2.get_value(), v3.get_value())179 self.assertEqual(v3.get_value(), 5)180 def testAddGapsUnsat(self):181 v1 = Variable([0, 4, 8])182 v2 = Variable()183 v3 = Variable(10)184 m = Model(v1 + v2 == v3, v3 == 3)185 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)186 s.solve()187 self.assertTrue(s.is_unsat())188 def testSubEq(self):189 v1 = Variable(5)190 v2 = Variable(5)191 v3 = Variable(5)192 m = Model(v1 - v2 == v3)193 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)194 s.solve()195 self.assertTrue(s.is_sat())196 self.assertEqual(v1.get_value() - v2.get_value(), v3.get_value())197 def testAddMultiple(self):198 v1 = Variable(5)199 v2 = Variable(5)200 v3 = Variable(5)201 v4 = Variable(5)202 v5 = Variable(5)203 m = Model(v1 + v2 + v3 + v4 == v5)204 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)205 s.solve()206 self.assertTrue(s.is_sat())207 self.assertEqual(v1.get_value() + v2.get_value() + v3.get_value() + v4.get_value(), v5.get_value())208 def testSumEq(self):209 v1 = Variable(5)210 v2 = Variable(5)211 v3 = Variable(5)212 m = Model(Sum([v1, v2]) == v3)213 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)214 s.solve()215 self.assertTrue(s.is_sat())216 self.assertEqual(v1.get_value() + v2.get_value(), v3.get_value())217 def testSumArray(self):218 vs = VarArray(4, 1, 5)219 v_sum = Variable(5)220 m = Model(Sum(vs) == v_sum)221 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)222 s.solve()223 self.assertTrue(s.is_sat())224 self.assertEqual(sum(v.get_value() for v in vs), v_sum.get_value())225 def testSumSingleCoef(self):226 v1 = Variable(5)227 m = Model(Sum([v1], [-2]) == -4)228 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)229 s.solve()230 self.assertTrue(s.is_sat())231 self.assertEqual(v1.get_value() * -2, -4)232 def testSumCoefEq(self):233 v1 = Variable(5)234 v2 = Variable(5)235 v3 = Variable(5)236 m = Model(Sum([v1, v2], [2, 2]) == v3)237 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)238 s.solve()239 self.assertTrue(s.is_sat())240 self.assertEqual(v1.get_value() * 2 + v2.get_value() * 2, v3.get_value())241 def testSumCoefNeg(self):242 v1 = Variable(5)243 v2 = Variable(5)244 m = Model(Sum([v1, v2], [-1, -1]) == -3)245 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)246 s.solve()247 self.assertTrue(s.is_sat())248 self.assertEqual(v1.get_value() * -1 + v2.get_value() * -1, -3)249 def testMulConstant(self):250 v1 = Variable(5)251 v2 = Variable(5)252 m = Model(v1 * 3 == v2)253 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)254 s.solve()255 self.assertTrue(s.is_sat())256 self.assertEqual(v1.get_value() * 3, v2.get_value())257 def testMulVars(self):258 v1 = Variable(5)259 v2 = Variable(5)260 m = Model(v1 * v2 == 3)261 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)262 s.solve()263 self.assertTrue(s.is_sat())264 self.assertEqual(v1.get_value() * v2.get_value(), 3)265 def testMulPredicate(self):266 v1 = Variable(5)267 v2 = Variable(5)268 v3 = Variable(5)269 m = Model(v1 * v2 == v3)270 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)271 s.solve()272 self.assertTrue(s.is_sat())273 self.assertEqual(v1.get_value() * v2.get_value(), v3.get_value())274 def testMulGapsSat(self):275 v1 = Variable([0, 4, 8])276 v2 = Variable()277 v3 = Variable(10)278 m = Model(v1 * v2 == v3, v3 == 4)279 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)280 s.solve()281 self.assertTrue(s.is_sat())282 self.assertEqual(v1.get_value() * v2.get_value(), v3.get_value())283 self.assertEqual(v3.get_value(), 4)284 def testMulGapsUnsat(self):285 v1 = Variable([0, 4, 8])286 v2 = Variable()287 v3 = Variable(10)288 m = Model(v1 * v2 == v3, v3 == 3)289 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)290 s.solve()291 self.assertTrue(s.is_unsat())292 # def testDivConstant(self):293 # v1 = Variable(5)294 # v2 = Variable(5)295 # m = Model(v1 / 3 == v2)296 # s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)297 # s.solve()298 # self.assertTrue(s.is_sat())299 # self.assertEqual(v1.get_value() / 3, v2.get_value())300 def testAllDiff(self):301 vs = VarArray(5, 1, 5)302 m = Model(AllDiff(vs))303 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)304 s.solve()305 values = [v.get_value() for v in vs]306 self.assertItemsEqual(range(1, 6), values)307 def testMaximise(self):308 v1 = Variable(5)309 v2 = Variable(5)310 m = Model(v1 < v2, Maximise(v1))311 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)312 s.solve()313 self.assertTrue(s.is_sat())314 self.assertEqual(v1.get_value(), 3)315 self.assertEqual(v2.get_value(), 4)316 def testMinimise(self):317 v1 = Variable(5)318 v2 = Variable(5)319 m = Model(v1 < v2, Minimise(v2))320 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)321 s.solve()322 self.assertTrue(s.is_sat())323 self.assertEqual(v1.get_value(), 0)324 self.assertEqual(v2.get_value(), 1)325 def testFormula(self):326 a, b, c = VarArray(3)327 m = Model(((a == True) | (b == False)),328 ((b == True) | (a == False)),329 (c == True))330 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)331 s.solve()332 self.assertTrue(s.is_sat())333 self.assertTrue(c.get_value())334 self.assertEqual(a.get_value(), b.get_value())335 def testFormula2(self):336 a, b, c = VarArray(3)337 m = Model(((a == True) | (b == False)) &338 ((b == True) | (a == False)) &339 (c == True))340 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)341 s.solve()342 self.assertTrue(s.is_sat())343 self.assertTrue(c.get_value())344 self.assertEqual(a.get_value(), b.get_value())345 # FIXME Encoding of Table constraint is not supported yet.346 # def testTable(self):347 # v1, v2 = VarArray(2, 1, 3)348 # t = Table([v1, v2], [[1, 1], [2, 2], [3, 3]])349 # m = Model(t)350 # s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)351 # s.solve()352 # self.assertTrue(s.is_sat())353 # self.assertNotEqual(v1.get_value(), v2.get_value())354 def testMax(self):355 v1 = Variable(5)356 v2 = Variable(5)357 m = Model(Max([v1, v2]) < 2)358 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)359 s.solve()360 self.assertTrue(s.is_sat())361 self.assertLess(v1.get_value(), 2)362 self.assertLess(v2.get_value(), 2)363 def testMaxNegVars(self):364 v1 = Variable(-5, -1)365 v2 = Variable(-5, -1)366 m = Model(Max([v1, v2]) < -2)367 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)368 s.solve()369 self.assertTrue(s.is_sat())370 self.assertLess(v1.get_value(), -2)371 self.assertLess(v2.get_value(), -2)372 # def testLargeDomain(self):373 # v1 = Variable(1000)374 # v2 = Variable(1000)375 # m = Model(v1 == v2)376 # s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)377 # s.solve()378 # self.assertTrue(s.is_sat())379 # def testAllDiffLargeDomain(self):380 # vs = VarArray(100, 1, 1000)381 # m = Model(AllDiff(vs))382 # s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)383 # s.solve()384 # self.assertTrue(s.is_sat())385 # ---------------- Absolute ----------------386 def testAbsEq(self):387 v1 = Variable(-4, -1)388 m = Model(Abs(v1) == 2)389 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)390 s.solve()391 self.assertTrue(s.is_sat())392 self.assertEqual(v1.get_value(), -2)393 def testAbsGt(self):394 v1 = Variable(-4, -1)395 m = Model(Abs(v1) > 2)396 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)397 s.solve()398 self.assertTrue(s.is_sat())399 self.assertLess(v1.get_value(), -2)400 def testAbsLt(self):401 v1 = Variable(-4, -1)402 m = Model(Abs(v1) < 2)403 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)404 s.solve()405 self.assertTrue(s.is_sat())406 self.assertGreater(v1.get_value(), -2)407 # ---------------- Reification of inequalities ----------------408 # Note that some of these tests require that the order encoding of the domain409 # is enabled. For now, this will just take a copy and set the bit to true,410 # but should find a better solution for parameterized test-cases in future.411 def testReifEqTrue(self):412 v1 = Variable(5)413 v2 = Variable()414 m = Model(v2 == (v1 == 3), v2 == 1)415 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)416 s.solve()417 self.assertTrue(s.is_sat())418 self.assertEqual(v1.get_value(), 3)419 self.assertEqual(v2.get_value(), 1)420 def testReifEqFalse(self):421 v1 = Variable(5)422 v2 = Variable()423 m = Model(v2 == (v1 == 3), v2 == 0)424 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)425 s.solve()426 self.assertTrue(s.is_sat())427 self.assertNotEqual(v1.get_value(), 3)428 self.assertEqual(v2.get_value(), 0)429 def testReifNeTrue(self):430 v1 = Variable(5)431 v2 = Variable()432 m = Model(v2 == (v1 != 3), v2 == 1)433 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)434 s.solve()435 self.assertTrue(s.is_sat())436 self.assertNotEqual(v1.get_value(), 3)437 self.assertEqual(v2.get_value(), 1)438 def testReifNeFalse(self):439 v1 = Variable(5)440 v2 = Variable()441 m = Model(v2 == (v1 != 3), v2 == 0)442 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)443 s.solve()444 self.assertTrue(s.is_sat())445 self.assertEqual(v1.get_value(), 3)446 self.assertEqual(v2.get_value(), 0)447 def testReifLeTrue(self):448 v1 = Variable(5)449 v2 = Variable()450 m = Model(v2 == (v1 <= 3), v2 == 1)451 e = copy(SATEncodingTest.encoding)452 e.order = True453 s = SATEncodingTest.solver(m, encoding=e)454 s.solve()455 self.assertTrue(s.is_sat())456 self.assertLessEqual(v1.get_value(), 3)457 self.assertEqual(v2.get_value(), 1)458 def testReifLeFalse(self):459 v1 = Variable(5)460 v2 = Variable()461 m = Model(v2 == (v1 <= 3), v2 == 0)462 e = copy(SATEncodingTest.encoding)463 e.order = True464 s = SATEncodingTest.solver(m, encoding=e)465 s.solve()466 self.assertTrue(s.is_sat())467 self.assertGreater(v1.get_value(), 3)468 self.assertEqual(v2.get_value(), 0)469 def testReifLtTrue(self):470 v1 = Variable(5)471 v2 = Variable()472 m = Model(v2 == (v1 < 3), v2 == 1)473 e = copy(SATEncodingTest.encoding)474 e.order = True475 s = SATEncodingTest.solver(m, encoding=e)476 s.solve()477 self.assertTrue(s.is_sat())478 self.assertLess(v1.get_value(), 3)479 self.assertEqual(v2.get_value(), 1)480 def testReifLtFalse(self):481 v1 = Variable(5)482 v2 = Variable()483 m = Model(v2 == (v1 < 3), v2 == 0)484 e = copy(SATEncodingTest.encoding)485 e.order = True486 s = SATEncodingTest.solver(m, encoding=e)487 s.solve()488 self.assertTrue(s.is_sat())489 self.assertGreaterEqual(v1.get_value(), 3)490 self.assertEqual(v2.get_value(), 0)491 def testReifGeTrue(self):492 v1 = Variable(5)493 v2 = Variable()494 m = Model(v2 == (v1 >= 3), v2 == 1)495 e = copy(SATEncodingTest.encoding)496 e.order = True497 s = SATEncodingTest.solver(m, encoding=e)498 s.solve()499 self.assertTrue(s.is_sat())500 self.assertGreaterEqual(v1.get_value(), 3)501 self.assertEqual(v2.get_value(), 1)502 def testReifGeFalse(self):503 v1 = Variable(5)504 v2 = Variable()505 m = Model(v2 == (v1 >= 3), v2 == 0)506 e = copy(SATEncodingTest.encoding)507 e.order = True508 s = SATEncodingTest.solver(m, encoding=e)509 s.solve()510 self.assertTrue(s.is_sat())511 self.assertLess(v1.get_value(), 3)512 self.assertEqual(v2.get_value(), 0)513 def testReifGtTrue(self):514 v1 = Variable(5)515 v2 = Variable()516 m = Model(v2 == (v1 > 3), v2 == 1)517 e = copy(SATEncodingTest.encoding)518 e.order = True519 s = SATEncodingTest.solver(m, encoding=e)520 s.solve()521 self.assertTrue(s.is_sat())522 self.assertGreater(v1.get_value(), 3)523 self.assertEqual(v2.get_value(), 1)524 def testReifGtFalse(self):525 v1 = Variable(5)526 v2 = Variable()527 m = Model(v2 == (v1 > 3), v2 == 0)528 e = copy(SATEncodingTest.encoding)529 e.order = True530 s = SATEncodingTest.solver(m, encoding=e)531 s.solve()532 self.assertTrue(s.is_sat())533 self.assertLessEqual(v1.get_value(), 3)534 self.assertEqual(v2.get_value(), 0)535 # ---------------- Reification of inequalities between two expressions ----------------536 def testReif2ExprEqTrue(self):537 v1 = Variable(5)538 v2 = Variable()539 v3 = Variable(5)540 m = Model(v2 == (v1 == v3 + 2), v2 == 1)541 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)542 s.solve()543 self.assertTrue(s.is_sat())544 self.assertEqual(v1.get_value(), v3.get_value() + 2)545 self.assertEqual(v2.get_value(), 1)546 def testReif2ExprEqFalse(self):547 v1 = Variable(5)548 v2 = Variable()549 v3 = Variable(5)550 m = Model(v2 == (v1 == v3 + 2), v2 == 0)551 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)552 s.solve()553 self.assertTrue(s.is_sat())554 self.assertNotEqual(v1.get_value(), v3.get_value() + 2)555 self.assertEqual(v2.get_value(), 0)556 def testReif2ExprNeTrue(self):557 v1 = Variable(5)558 v2 = Variable()559 v3 = Variable(5)560 m = Model(v2 == (v1 != v3 + 2), v2 == 1)561 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)562 s.solve()563 self.assertTrue(s.is_sat())564 self.assertNotEqual(v1.get_value(), v3.get_value() + 2)565 self.assertEqual(v2.get_value(), 1)566 def testReif2ExprNeFalse(self):567 v1 = Variable(5)568 v2 = Variable()569 v3 = Variable(5)570 m = Model(v2 == (v1 != v3 + 2), v2 == 0)571 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)572 s.solve()573 self.assertTrue(s.is_sat())574 self.assertEqual(v1.get_value(), v3.get_value() + 2)575 self.assertEqual(v2.get_value(), 0)576 def testReif2ExprLeTrue(self):577 v1 = Variable(5)578 v2 = Variable()579 v3 = Variable(5)580 m = Model(v2 == (v1 <= v3 + 2), v2 == 1)581 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)582 s.solve()583 self.assertTrue(s.is_sat())584 self.assertLessEqual(v1.get_value(), v3.get_value() + 2)585 self.assertEqual(v2.get_value(), 1)586 def testReif2ExprLeFalse(self):587 v1 = Variable(5)588 v2 = Variable()589 v3 = Variable(5)590 m = Model(v2 == (v1 <= v3 + 2), v2 == 0)591 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)592 s.solve()593 self.assertTrue(s.is_sat())594 self.assertGreater(v1.get_value(), v3.get_value() + 2)595 self.assertEqual(v2.get_value(), 0)596 def testReif2ExprLtTrue(self):597 v1 = Variable(5)598 v2 = Variable()599 v3 = Variable(5)600 m = Model(v2 == (v1 < v3 + 2), v2 == 1)601 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)602 s.solve()603 self.assertTrue(s.is_sat())604 self.assertLess(v1.get_value(), v3.get_value() + 2)605 self.assertEqual(v2.get_value(), 1)606 def testReif2ExprLtFalse(self):607 v1 = Variable(5)608 v2 = Variable()609 v3 = Variable(5)610 m = Model(v2 == (v1 < v3 + 2), v2 == 0)611 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)612 s.solve()613 self.assertTrue(s.is_sat())614 self.assertGreaterEqual(v1.get_value(), v3.get_value() + 2)615 self.assertEqual(v2.get_value(), 0)616 def testReif2ExprGeTrue(self):617 v1 = Variable(5)618 v2 = Variable()619 v3 = Variable(5)620 m = Model(v2 == (v1 >= v3 + 2), v2 == 1)621 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)622 s.solve()623 self.assertTrue(s.is_sat())624 self.assertGreaterEqual(v1.get_value(), v3.get_value() + 2)625 self.assertEqual(v2.get_value(), 1)626 def testReif2ExprGeFalse(self):627 v1 = Variable(5)628 v2 = Variable()629 v3 = Variable(5)630 m = Model(v2 == (v1 >= v3 + 2), v2 == 0)631 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)632 s.solve()633 self.assertTrue(s.is_sat())634 self.assertLess(v1.get_value(), v3.get_value() + 2)635 self.assertEqual(v2.get_value(), 0)636 def testReif2ExprGtTrue(self):637 v1 = Variable(5)638 v2 = Variable()639 v3 = Variable(5)640 m = Model(v2 == (v1 > v3 + 2), v2 == 1)641 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)642 s.solve()643 self.assertTrue(s.is_sat())644 self.assertGreater(v1.get_value(), v3.get_value() + 2)645 self.assertEqual(v2.get_value(), 1)646 def testReif2ExprGtFalse(self):647 v1 = Variable(5)648 v2 = Variable()649 v3 = Variable(5)650 m = Model(v2 == (v1 > v3 + 2), v2 == 0)651 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)652 s.solve()653 self.assertTrue(s.is_sat())654 self.assertLessEqual(v1.get_value(), v3.get_value() + 2)655 self.assertEqual(v2.get_value(), 0)656 def testReif2ExprMul(self):657 v1 = Variable(5)658 v2 = Variable()659 v3 = Variable(1, 5)660 v4 = Variable(1, 5)661 m = Model(v2 == (v1 == v3 * v4), v2 == 1)662 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)663 s.solve()664 self.assertTrue(s.is_sat())665 self.assertEqual(v1.get_value(), v3.get_value() * v4.get_value())666 self.assertEqual(v2.get_value(), 1)667 def testReif2ExprFactorDomain(self):668 v1 = Variable(5)669 v2 = Variable()670 v3 = Variable(5)671 m = Model(v2 == (v1 == v3 * 2), v2 == 1)672 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)673 s.solve()674 self.assertTrue(s.is_sat())675 self.assertEqual(v1.get_value(), v3.get_value() * 2)676 self.assertEqual(v2.get_value(), 1)677 # ---------------- Modulus ----------------678 def testModConstant(self):679 v1 = Variable(10)680 v2 = Variable(5)681 m = Model(v2 == (v1 % 3), v1 == 4)682 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)683 s.solve()684 self.assertTrue(s.is_sat())685 self.assertEqual(v1.get_value(), 4)686 self.assertEqual(v2.get_value(), 1)687 def testModNegConstant(self):688 v1 = Variable(-5, 5)689 v2 = Variable(-5, 5)690 m = Model(v2 == (v1 % 3), v1 == -5)691 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)692 s.solve()693 self.assertTrue(s.is_sat())694 self.assertEqual(v1.get_value(), -5)695 self.assertEqual(v2.get_value(), -2)696 def testModTwoVars(self):697 v1 = Variable(-10, 10)698 v2 = Variable(-5, 5)699 v3 = Variable(-5, 5)700 m = Model(v3 == (v1 % v2), v1 == -5, v2 == 3)701 s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)702 s.solve()703 self.assertTrue(s.is_sat())704 self.assertEqual(v1.get_value(), -5)705 self.assertEqual(v2.get_value(), 3)...
parse_patient_files_full.py
Source:parse_patient_files_full.py
...10class ParsePatientFiles:11 def __init__(self):12 # Open a file13 base_dir = os.path.dirname(os.path.abspath(__file__))14 def get_value(self, sheet, row, col, type='', nullable=True):15 val = sheet.cell_value(row, col)16 if val == '':17 if nullable:18 return None19 elif type == 'convert_x_to_boolean':20 return False21 else:22 print('val: ', val, ' | type:', type, ' | nullable:', nullable)23 print('ERROR')24 else:25 if type == int:26 return int(val)27 elif type == 'convert_float_to_date':28 return self.convert_float_to_date(val)29 elif type == 'convert_x_to_boolean':30 return self.convert_x_to_boolean(val)31 else:32 return val33 def convert_float_to_date(self, date):34 if date != '':35 serial = date36 seconds = (serial - 25569) * 86400.037 date = datetime.datetime.utcfromtimestamp(seconds)38 return date.strftime('%m/%d/%y')39 def convert_x_to_boolean(self, x):40 if x == 'X':41 return True42 else: 43 return False44 def get_patient(self, sheet):45 patient = {}46 patient['patient_id'] = self.get_value(sheet,2,4,int,False)47 patient['first_name'] = self.get_value(sheet,2,1,'',False)48 patient['last_name'] = self.get_value(sheet,2,2,'',False)49 patient['gender'] = self.get_value(sheet,2,10,'',False)50 patient['age'] = self.get_value(sheet,2,7,int,False)51 patient['relationship'] = self.get_value(sheet,4,4)52 patient['first_appt_date'] = self.get_value(sheet,0,10,'convert_float_to_date')53 # print(OrderedDict(sorted(patient.items(), key=lambda t: t[0])))54 return patient55 def get_visit(self, sheet, patient):56 visit = {}57 visit['patient_id'] = int(patient['patient_id'])58 visit['visit_date'] = self.get_value(sheet,0,1,'convert_float_to_date',False)59 visit['next_appt_date'] = self.get_value(sheet,0,10,'convert_float_to_date')60 visit['pcp_name'] = self.get_value(sheet,4,1) + ' ' + self.get_value(sheet,4,2)61 visit['case_status'] = self.get_value(sheet,6,1)62 visit['health_risk_assessment'] = self.get_value(sheet,6,4)63 visit['biometrics'] = self.get_value(sheet,6,7)64 visit['coach_initials'] = self.get_value(sheet,6,10)65 visit['comments'] = self.get_value(sheet,8,1)66 # print(OrderedDict(sorted(visit.items(), key=lambda t: t[0])))67 return visit68 def get_diagnosis(self, sheet, patient, visit):69 diagnosis = {}70 diagnosis['patient_id'] = int(patient['patient_id'])71 diagnosis['visit_date'] = visit['visit_date']72 diagnosis['case_status'] = visit['case_status']73 diagnosis['overweight'] = self.get_value(sheet,12,0,'convert_x_to_boolean',False)74 diagnosis['obese'] = self.get_value(sheet,13,0,'convert_x_to_boolean',False)75 diagnosis['hypertension'] = self.get_value(sheet,14,0,'convert_x_to_boolean',False)76 diagnosis['cad'] = self.get_value(sheet,15,0,'convert_x_to_boolean',False)77 diagnosis['chf'] = self.get_value(sheet,16,0,'convert_x_to_boolean',False)78 diagnosis['hyperlipidemia'] = self.get_value(sheet,17,0,'convert_x_to_boolean',False)79 diagnosis['prediabetes'] = self.get_value(sheet,18,0,'convert_x_to_boolean',False)80 diagnosis['diabetes'] = self.get_value(sheet,19,0,'convert_x_to_boolean',False)81 diagnosis['asthma'] = self.get_value(sheet,20,0,'convert_x_to_boolean',False)82 diagnosis['copd'] = self.get_value(sheet,21,0,'convert_x_to_boolean',False)83 diagnosis['depression'] = self.get_value(sheet,22,0,'convert_x_to_boolean',False)84 diagnosis['nicotine_use'] = self.get_value(sheet,23,0,'convert_x_to_boolean',False)85 # print(OrderedDict(sorted(diagnosis.items(), key=lambda t: t[0])))86 return diagnosis87 def get_weight_management(self, sheet, patient, visit):88 weight_management = {}89 weight_management['patient_id'] = int(patient['patient_id'])90 weight_management['visit_date'] = visit['visit_date']91 weight_management['height_measured'] = self.get_value(sheet,12,4,int)92 weight_management['height_baseline'] = self.get_value(sheet,12,5,int)93 weight_management['waist_measured'] = self.get_value(sheet,13,4,int)94 weight_management['waist_baseline'] = self.get_value(sheet,13,5,int)95 weight_management['waist_progress'] = self.get_value(sheet,13,6,int)96 weight_management['waist_goal'] = self.get_value(sheet,13,7)97 weight_management['waist_date_goal_achieved'] = self.get_value(sheet,13,8,'convert_float_to_date')98 weight_management['waist_progress_notes'] = self.get_value(sheet,13,9)99 weight_management['weight_measured'] = self.get_value(sheet,14,4,int)100 weight_management['weight_baseline'] = self.get_value(sheet,14,5,int)101 weight_management['weight_progress'] = self.get_value(sheet,14,6,int)102 weight_management['weight_goal'] = self.get_value(sheet,14,7)103 weight_management['weight_date_goal_achieved'] = self.get_value(sheet,14,8,'convert_float_to_date')104 weight_management['weight_progress_notes'] = self.get_value(sheet,4,9)105 weight_management['bmi_measured'] = self.get_value(sheet,15,4,int)106 weight_management['bmi_baseline'] = self.get_value(sheet,15,5,int)107 weight_management['bmi_progress'] = self.get_value(sheet,15,6,int)108 weight_management['bmi_date_goal_achieved'] = self.get_value(sheet,15,8,'convert_float_to_date')109 weight_management['bmi_progress_notes'] = self.get_value(sheet,15,9)110 # print(OrderedDict(sorted(weight_management.items(), key=lambda t: t[0])))111 return weight_management112 def get_blood_pressure_control(self, sheet, patient, visit):113 blood_pressure_control = {}114 blood_pressure_control['patient_id'] = int(patient['patient_id'])115 blood_pressure_control['visit_date'] = visit['visit_date']116 blood_pressure_control['systolic_measured'] = self.get_value(sheet,16,4,int)117 blood_pressure_control['systolic_baseline'] = self.get_value(sheet,16,5,int)118 blood_pressure_control['systolic_progress'] = self.get_value(sheet,16,6,int)119 blood_pressure_control['systolic_goal'] = self.get_value(sheet,16,7,int)120 blood_pressure_control['systolic_date_goal_achieved'] = self.get_value(sheet,16,8,'convert_float_to_date')121 blood_pressure_control['systolic_progress_notes'] = self.get_value(sheet,16,9)122 blood_pressure_control['diastolic_measured'] = self.get_value(sheet,17,4,int)123 blood_pressure_control['diastolic_baseline'] = self.get_value(sheet,17,5,int)124 blood_pressure_control['diastolic_progress'] = self.get_value(sheet,17,6,int)125 blood_pressure_control['diastolic_goal'] = self.get_value(sheet,17,7,int)126 blood_pressure_control['diastolic_date_goal_achieved'] = self.get_value(sheet,17,8,'convert_float_to_date')127 blood_pressure_control['diastolic_progress_notes'] = self.get_value(sheet,17,9)128 # print(OrderedDict(sorted(blood_pressure_control.items(), key=lambda t: t[0])))129 return blood_pressure_control130 def get_lipid_management(self, sheet, patient, visit):131 lipid_management = {}132 lipid_management['patient_id'] = int(patient['patient_id'])133 lipid_management['visit_date'] = visit['visit_date']134 lipid_management['tc_measured'] = self.get_value(sheet,18,4,int)135 lipid_management['tc_baseline'] = self.get_value(sheet,18,5,int)136 lipid_management['tc_progress'] = self.get_value(sheet,18,6,int)137 lipid_management['tc_date_goal_achieved'] = self.get_value(sheet,18,8,'convert_float_to_date')138 lipid_management['tc_progress_notes'] = self.get_value(sheet,18,9)139 lipid_management['ldl_measured'] = self.get_value(sheet,19,4,int)140 lipid_management['ldl_baseline'] = self.get_value(sheet,19,5,int)141 lipid_management['ldl_progress'] = self.get_value(sheet,19,6,int)142 lipid_management['ldl_goal'] = self.get_value(sheet,19,7,int)143 lipid_management['ldl_date_goal_achieved'] = self.get_value(sheet,19,8,'convert_float_to_date')144 lipid_management['ldl_progress_notes'] = self.get_value(sheet,19,9)145 lipid_management['hdl_measured'] = self.get_value(sheet,20,4,int)146 lipid_management['hdl_baseline'] = self.get_value(sheet,20,5,int)147 lipid_management['hdl_progress'] = self.get_value(sheet,20,6,int)148 lipid_management['hdl_goal'] = self.get_value(sheet,20,7,int)149 lipid_management['hdl_date_goal_achieved'] = self.get_value(sheet,20,8,'convert_float_to_date')150 lipid_management['hdl_progress_notes'] = self.get_value(sheet,20,9)151 lipid_management['tgs_measured'] = self.get_value(sheet,21,4,int)152 lipid_management['tgs_baseline'] = self.get_value(sheet,21,5,int)153 lipid_management['tgs_progress'] = self.get_value(sheet,21,6,int)154 lipid_management['tgs_date_goal_achieved'] = self.get_value(sheet,21,8,'convert_float_to_date')155 lipid_management['tgs_progress_notes'] = self.get_value(sheet,21,9)156 # print(OrderedDict(sorted(lipid_management.items(), key=lambda t: t[0])))157 return lipid_management158 def diabetes(self, sheet, patient, visit):159 diabetes = {}160 diabetes['patient_id'] = int(patient['patient_id'])161 diabetes['visit_date'] = visit['visit_date']162 diabetes['fbg_measured'] = self.get_value(sheet,22,4,int)163 diabetes['fbg_baseline'] = self.get_value(sheet,22,5,int)164 diabetes['fbg_progress'] = self.get_value(sheet,22,6,int)165 diabetes['fbg_date_goal_achieved'] = self.get_value(sheet,22,8,'convert_float_to_date')166 diabetes['fbg_progress_notes'] = self.get_value(sheet,22,9)167 diabetes['hb_measured'] = self.get_value(sheet,23,4,int)168 diabetes['hb_baseline'] = self.get_value(sheet,23,5,int)169 diabetes['hb_progress'] = self.get_value(sheet,23,6,int)170 diabetes['hb_date_goal_achieved'] = self.get_value(sheet,23,8,'convert_float_to_date')171 diabetes['hb_progress_notes'] = self.get_value(sheet,23,9)172 diabetes['retinal_measured'] = self.get_value(sheet,24,4)173 diabetes['retinal_baseline'] = self.get_value(sheet,24,5)174 diabetes['retinal_progress'] = self.get_value(sheet,24,6,int)175 diabetes['retinal_goal'] = self.get_value(sheet,24,7)176 diabetes['retinal_date_goal_achieved'] = self.get_value(sheet,24,8,'convert_float_to_date')177 diabetes['retinal_progress_notes'] = self.get_value(sheet,24,9)178 diabetes['renal_measured'] = self.get_value(sheet,25,4)179 diabetes['renal_baseline'] = self.get_value(sheet,25,5)180 diabetes['renal_progress'] = self.get_value(sheet,25,6,int)181 diabetes['renal_goal'] = self.get_value(sheet,25,7)182 diabetes['renal_date_goal_achieved'] = self.get_value(sheet,25,8,'convert_float_to_date')183 diabetes['renal_progress_notes'] = self.get_value(sheet,25,9)184 diabetes['foot_measured'] = self.get_value(sheet,26,4)185 diabetes['foot_baseline'] = self.get_value(sheet,26,5)186 diabetes['foot_progress'] = self.get_value(sheet,26,6,int)187 diabetes['foot_goal'] = self.get_value(sheet,26,7)188 diabetes['foot_date_goal_achieved'] = self.get_value(sheet,26,8,'convert_float_to_date')189 diabetes['foot_progress_notes'] = self.get_value(sheet,26,9)190 # print(OrderedDict(sorted(diabetes.items(), key=lambda t: t[0])))191 return diabetes192 def get_compliance(self, sheet, patient, visit):193 compliance = {}194 compliance['patient_id'] = int(patient['patient_id'])195 compliance['visit_date'] = visit['visit_date']196 compliance['meds_measured'] = self.get_value(sheet,27,4)197 compliance['meds_baseline'] = self.get_value(sheet,27,5)198 compliance['meds_progress'] = self.get_value(sheet,27,6,int)199 compliance['meds_date_goal_achieved'] = self.get_value(sheet,27,8,'convert_float_to_date')200 compliance['meds_progress_notes'] = self.get_value(sheet,27,9)201 compliance['diet_measured'] = self.get_value(sheet,28,4)202 compliance['diet_baseline'] = self.get_value(sheet,28,5)203 compliance['diet_progress'] = self.get_value(sheet,28,6,int)204 compliance['diet_date_goal_achieved'] = self.get_value(sheet,28,8,'convert_float_to_date')205 compliance['diet_progress_notes'] = self.get_value(sheet,28,9)206 compliance['exercise_measured'] = self.get_value(sheet,29,4)207 compliance['exercise_baseline'] = self.get_value(sheet,29,5)208 compliance['exercise_progress'] = self.get_value(sheet,29,6,int)209 compliance['exercise_date_goal_achieved'] = self.get_value(sheet,29,8,'convert_float_to_date')210 compliance['exercise_progress_notes'] = self.get_value(sheet,29,9)211 compliance['nicotine_measured'] = self.get_value(sheet,30,4)212 compliance['nicotine_baseline'] = self.get_value(sheet,30,5)213 compliance['nicotine_progress'] = self.get_value(sheet,30,6,int)214 compliance['nicotine_goal'] = self.get_value(sheet,30,7)215 compliance['nicotine_date_goal_achieved'] = self.get_value(sheet,30,8,'convert_float_to_date')216 compliance['nicotine_progress_notes'] = self.get_value(sheet,30,9)217 # print(OrderedDict(sorted(compliance.items(), key=lambda t: t[0])))218 return compliance219 220########################################################################################################################221 # initialize database and update/insert data222 def process_files(self, patientFiles_path):223 db = CreateDB('patients.db','patients_schema.sql')224 path = os.path.join(patientFiles_path, '*.xlsm')225 for fname in glob.glob(path):226 print("processing ", os.path.basename(fname), "...")227 workbook = xlrd.open_workbook(fname)228 worksheets = workbook.sheet_names()229 for worksheet_name in worksheets:230 print("worksheet: ", worksheet_name)...
parsePatientFiles_old.py
Source:parsePatientFiles_old.py
...12 # Open a file13 base_dir = os.path.dirname(os.path.abspath(__file__))14 # path = "/Users/alyssafreeman/Documents/git/kensa_python/carePlanDashboard/patientFiles/*.xlsm"15 # dirs = os.listdir(path)16 def get_value(self, sheet, row, col, type='', nullable=True):17 val = sheet.cell_value(row, col)18 if val == '':19 if nullable:20 return None21 else:22 print('val: ', val, ' | type:', type, ' | nullable:', nullable)23 print('ERROR')24 else:25 if type == int:26 return int(val)27 elif type == 'convert_float_to_date':28 return self.convert_float_to_date(val)29 elif type == 'convert_x_to_boolean':30 return self.convert_x_to_boolean31 else:32 return val33 def convert_float_to_date(self, date):34 if date != '':35 serial = date36 seconds = (serial - 25569) * 86400.037 date = datetime.datetime.utcfromtimestamp(seconds)38 return date.strftime('%m/%d/%y')39 def convert_x_to_boolean(self, x):40 if x == 'X':41 return True42 else: 43 return False44 def get_patient(self, sheet):45 patient = {}46 patient['patient_id'] = self.get_value(sheet,2,4,int,False)47 patient['first_name'] = self.get_value(sheet,2,1,'',False)48 patient['last_name'] = self.get_value(sheet,2,2,'',False)49 patient['gender'] = self.get_value(sheet,2,10,'',False)50 patient['age'] = self.get_value(sheet,2,7,int,False)51 patient['relationship'] = self.get_value(sheet,4,4)52 patient['first_appt_date'] = self.get_value(sheet,0,10,'convert_float_to_date')53 db.insert_data('patient',patient)54 # print(OrderedDict(sorted(patient.items(), key=lambda t: t[0])))55 return sheet56 def get_visit(self, sheet):57 visit = {}58 visit['visit_date'] = self.get_value(sheet,0,1,'convert_float_to_date',False)59 visit['next_appt_date'] = self.get_value(sheet,0,10,'convert_float_to_date')60 visit['pcp_name'] = self.get_value(sheet,4,1) + ' ' + self.get_value(sheet,4,2)61 visit['case_status'] = self.get_value(sheet,6,1)62 visit['health_risk_assessment'] = self.get_value(sheet,6,4)63 visit['biometrics'] = self.get_value(sheet,6,7)64 visit['coach_initials'] = self.get_value(sheet,6,10)65 visit['comments'] = self.get_value(sheet,8,1)66 # print(OrderedDict(sorted(visit.items(), key=lambda t: t[0])))67 return sheet68 def get_diagnosis(self, sheet):69 diagnosis = {}70 diagnosis['overweight'] = self.get_value(sheet,12,0,'convert_x_to_boolean')71 diagnosis['obese'] = self.get_value(sheet,13,0,'convert_x_to_boolean')72 diagnosis['hypertension'] = self.get_value(sheet,14,0,'convert_x_to_boolean')73 diagnosis['cad'] = self.get_value(sheet,15,0,'convert_x_to_boolean')74 diagnosis['chf'] = self.get_value(sheet,16,0,'convert_x_to_boolean')75 diagnosis['hyperlipidemia'] = self.get_value(sheet,17,0,'convert_x_to_boolean')76 diagnosis['prediabetes'] = self.get_value(sheet,18,0,'convert_x_to_boolean')77 diagnosis['diabetes'] = self.get_value(sheet,19,0,'convert_x_to_boolean')78 diagnosis['asthma'] = self.get_value(sheet,20,0,'convert_x_to_boolean')79 diagnosis['copd'] = self.get_value(sheet,21,0,'convert_x_to_boolean')80 diagnosis['depression'] = self.get_value(sheet,22,0,'convert_x_to_boolean')81 diagnosis['nicotine_use'] = self.get_value(sheet,23,0,'convert_x_to_boolean')82 # print(OrderedDict(sorted(diagnosis.items(), key=lambda t: t[0])))83 return sheet84 def get_weight_management(self, sheet):85 weight_management = {}86 weight_management['height_measured'] = self.get_value(sheet,12,4,int)87 weight_management['height_baseline'] = self.get_value(sheet,12,5,int)88 weight_management['waist_measured'] = self.get_value(sheet,13,4,int)89 weight_management['waist_baseline'] = self.get_value(sheet,13,5,int)90 weight_management['waist_progress'] = self.get_value(sheet,13,6,int)91 weight_management['waist_goal'] = self.get_value(sheet,13,7)92 weight_management['waist_date_goal_achieved'] = self.get_value(sheet,13,8,'convert_float_to_date')93 weight_management['waist_progress_notes'] = self.get_value(sheet,13,9)94 weight_management['weight_measured'] = self.get_value(sheet,14,4,int)95 weight_management['weight_baseline'] = self.get_value(sheet,14,5,int)96 weight_management['weight_progress'] = self.get_value(sheet,14,6,int)97 weight_management['weight_goal'] = self.get_value(sheet,14,7)98 weight_management['weight_date_goal_achieved'] = self.get_value(sheet,14,8,'convert_float_to_date')99 weight_management['weight_progress_notes'] = self.get_value(sheet,4,9)100 weight_management['bmi_measured'] = self.get_value(sheet,15,4,int)101 weight_management['bmi_baseline'] = self.get_value(sheet,15,5,int)102 weight_management['bmi_progress'] = self.get_value(sheet,15,6,int)103 weight_management['bmi_date_goal_achieved'] = self.get_value(sheet,15,8,'convert_float_to_date')104 weight_management['bmi_progress_notes'] = self.get_value(sheet,15,9)105 # print(OrderedDict(sorted(weight_management.items(), key=lambda t: t[0])))106 return sheet107 def get_blood_pressure_control(self, sheet):108 blood_pressure_control = {}109 blood_pressure_control['systolic_measured'] = self.get_value(sheet,16,4,int)110 blood_pressure_control['systolic_baseline'] = self.get_value(sheet,16,5,int)111 blood_pressure_control['systolic_progress'] = self.get_value(sheet,16,6,int)112 blood_pressure_control['systolic_goal'] = self.get_value(sheet,16,7,int)113 blood_pressure_control['systolic_date_goal_achieved'] = self.get_value(sheet,16,8,'convert_float_to_date')114 blood_pressure_control['systolic_progress_notes'] = self.get_value(sheet,16,9)115 blood_pressure_control['diastolic_measured'] = self.get_value(sheet,17,4,int)116 blood_pressure_control['diastolic_baseline'] = self.get_value(sheet,17,5,int)117 blood_pressure_control['diastolic_progress'] = self.get_value(sheet,17,6,int)118 blood_pressure_control['diastolic_goal'] = self.get_value(sheet,17,7,int)119 blood_pressure_control['diastolic_date_goal_achieved'] = self.get_value(sheet,17,8,'convert_float_to_date')120 blood_pressure_control['diastolic_progress_notes'] = self.get_value(sheet,17,9)121 # print(OrderedDict(sorted(blood_pressure_control.items(), key=lambda t: t[0])))122 return sheet123 def get_lipid_management(self, sheet):124 lipid_management = {}125 lipid_management['tc_measured'] = self.get_value(sheet,18,4,int)126 lipid_management['tc_baseline'] = self.get_value(sheet,18,5,int)127 lipid_management['tc_progress'] = self.get_value(sheet,18,6,int)128 lipid_management['tc_date_goal_achieved'] = self.get_value(sheet,18,8,'convert_float_to_date')129 lipid_management['tc_progress_notes'] = self.get_value(sheet,18,9)130 lipid_management['ldl_measured'] = self.get_value(sheet,19,4,int)131 lipid_management['ldl_baseline'] = self.get_value(sheet,19,5,int)132 lipid_management['ldl_progress'] = self.get_value(sheet,19,6,int)133 lipid_management['ldl_goal'] = self.get_value(sheet,19,7,int)134 lipid_management['ldl_date_goal_achieved'] = self.get_value(sheet,19,8,'convert_float_to_date')135 lipid_management['ldl_progress_notes'] = self.get_value(sheet,19,9)136 lipid_management['hdl_measured'] = self.get_value(sheet,20,4,int)137 lipid_management['hdl_baseline'] = self.get_value(sheet,20,5,int)138 lipid_management['hdl_progress'] = self.get_value(sheet,20,6,int)139 lipid_management['hdl_goal'] = self.get_value(sheet,20,7,int)140 lipid_management['hdl_date_goal_achieved'] = self.get_value(sheet,20,8,'convert_float_to_date')141 lipid_management['hdl_progress_notes'] = self.get_value(sheet,20,9)142 lipid_management['tgs_measured'] = self.get_value(sheet,21,4,int)143 lipid_management['tgs_baseline'] = self.get_value(sheet,21,5,int)144 lipid_management['tgs_progress'] = self.get_value(sheet,21,6,int)145 lipid_management['tgs_date_goal_achieved'] = self.get_value(sheet,21,8,'convert_float_to_date')146 lipid_management['tgs_progress_notes'] = self.get_value(sheet,21,9)147 # print(OrderedDict(sorted(lipid_management.items(), key=lambda t: t[0])))148 return sheet149 def get_fbg_measured(self, sheet):150 fbg_measured = {}151 fbg_measured['fbg_measured'] = self.get_value(sheet,22,4,int)152 fbg_measured['fbg_baseline'] = self.get_value(sheet,22,5,int)153 fbg_measured['fbg_progress'] = self.get_value(sheet,22,6,int)154 fbg_measured['fbg_date_goal_achieved'] = self.get_value(sheet,22,8,'convert_float_to_date')155 fbg_measured['fbg_progress_notes'] = self.get_value(sheet,22,9)156 fbg_measured['hb_measured'] = self.get_value(sheet,23,4,int)157 fbg_measured['hb_baseline'] = self.get_value(sheet,23,5,int)158 fbg_measured['hb_progress'] = self.get_value(sheet,23,6,int)159 fbg_measured['hb_date_goal_achieved'] = self.get_value(sheet,23,8,'convert_float_to_date')160 fbg_measured['hb_progress_notes'] = self.get_value(sheet,23,9)161 fbg_measured['retinal_measured'] = self.get_value(sheet,24,4)162 fbg_measured['retinal_baseline'] = self.get_value(sheet,24,5)163 fbg_measured['retinal_progress'] = self.get_value(sheet,24,6,int)164 fbg_measured['retinal_goal'] = self.get_value(sheet,24,7)165 fbg_measured['retinal_date_goal_achieved'] = self.get_value(sheet,24,8,'convert_float_to_date')166 fbg_measured['retinal_progress_notes'] = self.get_value(sheet,24,9)167 fbg_measured['renal_measured'] = self.get_value(sheet,25,4)168 fbg_measured['renal_baseline'] = self.get_value(sheet,25,5)169 fbg_measured['renal_progress'] = self.get_value(sheet,25,6,int)170 fbg_measured['renal_goal'] = self.get_value(sheet,25,7)171 fbg_measured['renal_date_goal_achieved'] = self.get_value(sheet,25,8,'convert_float_to_date')172 fbg_measured['renal_progress_notes'] = self.get_value(sheet,25,9)173 fbg_measured['foot_measured'] = self.get_value(sheet,26,4)174 fbg_measured['foot_baseline'] = self.get_value(sheet,26,5)175 fbg_measured['foot_progress'] = self.get_value(sheet,26,6,int)176 fbg_measured['foot_goal'] = self.get_value(sheet,26,7)177 fbg_measured['foot_date_goal_achieved'] = self.get_value(sheet,26,8,'convert_float_to_date')178 fbg_measured['foot_progress_notes'] = self.get_value(sheet,26,9)179 # print(OrderedDict(sorted(fbg_measured.items(), key=lambda t: t[0])))180 return sheet181 def get_compliance(self, sheet):182 compliance = {}183 compliance['meds_measured'] = self.get_value(sheet,27,4)184 compliance['meds_baseline'] = self.get_value(sheet,27,5)185 compliance['meds_progress'] = self.get_value(sheet,27,6,int)186 compliance['meds_date_goal_achieved'] = self.get_value(sheet,27,8,'convert_float_to_date')187 compliance['meds_progress_notes'] = self.get_value(sheet,27,9)188 compliance['diet_measured'] = self.get_value(sheet,28,4)189 compliance['diet_baseline'] = self.get_value(sheet,28,5)190 compliance['diet_progress'] = self.get_value(sheet,28,6,int)191 compliance['diet_date_goal_achieved'] = self.get_value(sheet,28,8,'convert_float_to_date')192 compliance['diet_progress_notes'] = self.get_value(sheet,28,9)193 compliance['exercise_measured'] = self.get_value(sheet,29,4)194 compliance['exercise_baseline'] = self.get_value(sheet,29,5)195 compliance['exercise_progress'] = self.get_value(sheet,29,6,int)196 compliance['exercise_date_goal_achieved'] = self.get_value(sheet,29,8,'convert_float_to_date')197 compliance['exercise_progress_notes'] = self.get_value(sheet,29,9)198 compliance['nicotine_measured'] = self.get_value(sheet,30,4)199 compliance['nicotine_baseline'] = self.get_value(sheet,30,5)200 compliance['nicotine_progress'] = self.get_value(sheet,30,6,int)201 compliance['nicotine_goal'] = self.get_value(sheet,30,7)202 compliance['nicotine_date_goal_achieved'] = self.get_value(sheet,30,8,'convert_float_to_date')203 compliance['nicotine_progress_notes'] = self.get_value(sheet,30,9)204 # print(OrderedDict(sorted(compliance.items(), key=lambda t: t[0])))205 return sheet206 207########################################################################################################################208# initialize database and update/insert data209db = DBInit('patient.db','patients_schema.sql')210pf = ParsePatientFiles()211# path = "/Users/alyssafreeman/Documents/git/kensa_python/carePlanDashboard/patientFiles/*.xlsm"212# path = os.path.join(db.base_dir, "/patientFiles/*.xlsm")213path = db.base_dir + "/patientFiles/*.xlsm"214for fname in glob.glob(path):215 print("processing ", os.path.basename(fname), "...")216 workbook = xlrd.open_workbook(fname)217 worksheets = workbook.sheet_names()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!