Best Python code snippet using selene_python
network_ShillInitScripts.py
Source:network_ShillInitScripts.py
...125 """ Restart vital system services at the end of the test. """126 utils.system('start shill', ignore_status=True)127 if self.recover_duts_stopped:128 utils.system('start recover_duts', ignore_status=True)129 def assure(self, must_be_true, assertion_name):130 """ Perform a named assertion.131 @param must_be_true boolean parameter that must be true.132 @param assertion_name string name of this assertion.133 """134 if not must_be_true:135 raise error.TestFail('%s: Assertion failed: %s' %136 (self.test_name, assertion_name))137 def assure_path_owner(self, path, owner):138 """ Assert that |path| is owned by |owner|.139 @param path string pathname to test.140 @param owner string user name that should own |path|.141 """142 self.assure(pwd.getpwuid(os.stat(path).st_uid)[0] == owner,143 'Path %s is owned by %s' % (path, owner))144 def assure_path_group(self, path, group):145 """ Assert that |path| is owned by |group|.146 @param path string pathname to test.147 @param group string group name that should own |path|.148 """149 self.assure(grp.getgrgid(os.stat(path).st_gid)[0] == group,150 'Path %s is group-owned by %s' % (path, group))151 def assure_exists(self, path, path_friendly_name):152 """ Assert that |path| exists.153 @param path string pathname to test.154 @param path_friendly_name string user-parsable description of |path|.155 """156 self.assure(os.path.exists(path), '%s exists' % path_friendly_name)157 def assure_is_dir(self, path, path_friendly_name):158 """ Assert that |path| is a directory.159 @param path string pathname to test.160 @param path_friendly_name string user-parsable description of |path|.161 """162 self.assure_exists(path, path_friendly_name)163 self.assure(stat.S_ISDIR(os.lstat(path).st_mode),164 '%s is a directory' % path_friendly_name)165 def assure_is_link(self, path, path_friendly_name):166 """ Assert that |path| is a symbolic link.167 @param path string pathname to test.168 @param path_friendly_name string user-parsable description of |path|.169 """170 self.assure_exists(path, path_friendly_name)171 self.assure(stat.S_ISLNK(os.lstat(path).st_mode),172 '%s is a symbolic link' % path_friendly_name)173 def assure_is_link_to(self, path, pointee, path_friendly_name):174 """ Assert that |path| is a symbolic link to |pointee|.175 @param path string pathname to test.176 @param pointee string pathname that |path| should point to.177 @param path_friendly_name string user-parsable description of |path|.178 """179 self.assure_is_link(path, path_friendly_name)180 self.assure(os.readlink(path) == pointee,181 '%s is a symbolic link to %s' %182 (path_friendly_name, pointee))183 def assure_method_calls(self, expected_method_calls, assertion_name):184 """ Assert that |expected_method_calls| were executed on mock_flimflam.185 @param expected_method_calls list of string-tuple pairs of method186 name + tuple of arguments.187 @param assertion_name string name to assign to the assertion.188 """189 method_calls = self.mock_flimflam.get_method_calls()190 if len(expected_method_calls) != len(method_calls):191 self.assure(False, '%s: method call count does not match' %192 assertion_name)193 for expected, actual in zip(expected_method_calls, method_calls):194 self.assure(actual.method == expected[0],195 '%s: method %s matches expected %s' %196 (assertion_name, actual.method, expected[0]))197 self.assure(actual.argument == expected[1],198 '%s: argument %s matches expected %s' %199 (assertion_name, actual.argument, expected[1]))200 def create_file_with_contents(self, filename, contents):201 """ Create a file named |filename| that contains |contents|.202 @param filename string name of file.203 @param contents string contents of file.204 """205 with open(filename, 'w') as f:206 f.write(contents)207 def touch(self, filename):208 """ Create an empty file named |filename|.209 @param filename string name of file.210 """211 self.create_file_with_contents(filename, '')212 def create_new_shill_user_profile(self, contents):213 """ Create a fake new user profile with |contents|.214 @param contents string contents of the new user profile.215 """216 os.mkdir(self.new_shill_user_profile_dir)217 self.create_file_with_contents(self.new_shill_user_profile, contents)218 def create_old_shill_user_profile(self, contents):219 """ Create a fake old-style user profile with |contents|.220 @param contents string contents of the old user profile.221 """222 os.mkdir(self.old_shill_user_profile_dir)223 self.create_file_with_contents(self.old_shill_user_profile, contents)224 def create_flimflam_user_profile(self, contents):225 """ Create a legacy flimflam user profile with |contents|.226 @param contents string contents of the flimflam user profile.227 """228 os.mkdir(self.flimflam_user_profile_dir)229 self.create_file_with_contents(self.flimflam_user_profile, contents)230 def file_contents(self, filename):231 """ Returns the contents of |filename|.232 @param filename string name of file to read.233 """234 with open(filename) as f:235 return f.read()236 def find_pid(self, process_name):237 """ Returns the process id of |process_name|.238 @param process_name string name of process to search for.239 """240 return utils.system_output('pgrep %s' % process_name,241 ignore_status=True).split('\n')[0]242 def get_commandline(self):243 """ Returns the command line of the current shill executable. """244 pid = self.find_pid('shill')245 return file('/proc/%s/cmdline' % pid).read().split('\0')246 def run_once(self):247 """ Main test loop. """248 try:249 self.start_test()250 except:251 self.restart_system_processes()252 raise253 try:254 self.run_tests([255 self.test_start_shill,256 self.test_start_logged_in,257 self.test_start_port_flimflam_profile])258 # The tests above run a real instance of shill, whereas the tests259 # below rely on a mock instance of shill. We must take care not260 # to run the mock at the same time as a real shill instance.261 self.start_mock_flimflam()262 self.run_tests([263 self.test_login,264 self.test_login_guest,265 self.test_login_profile_exists,266 self.test_login_old_shill_profile,267 self.test_login_invalid_old_shill_profile,268 self.test_login_ignore_old_shill_profile,269 self.test_login_flimflam_profile,270 self.test_login_ignore_flimflam_profile,271 self.test_login_prefer_old_shill_profile,272 self.test_login_multi_profile,273 self.test_logout])274 finally:275 # Stop any shill instances started during testing.276 self.stop_shill()277 self.end_test()278 def run_tests(self, tests):279 """ Executes each of the test subparts in sequence.280 @param tests list of methods to run.281 """282 for test in tests:283 self.test_name = test.__name__284 test()285 self.stop_shill()286 self.erase_state()287 def test_start_shill(self):288 """ Test all created pathnames during shill startup.289 Also ensure the push argument is not provided by default.290 """291 self.touch('/home/chronos/.disable_shill')292 self.start_shill()293 self.assure_is_dir('/var/run/shill', 'Shill run directory')294 self.assure_is_dir('/var/lib/dhcpcd', 'dhcpcd lib directory')295 self.assure_path_owner('/var/lib/dhcpcd', 'dhcp')296 self.assure_path_group('/var/lib/dhcpcd', 'dhcp')297 self.assure_is_dir('/var/run/dhcpcd', 'dhcpcd run directory')298 self.assure_path_owner('/var/run/dhcpcd', 'dhcp')299 self.assure_path_group('/var/run/dhcpcd', 'dhcp')300 self.assure(not os.path.exists('/home/chronos/.disable_shill'),301 'Shill disable file does not exist')302 self.assure('--push=~chronos/shill' not in self.get_commandline(),303 'Shill command line does not contain push argument')304 def test_start_logged_in(self):305 """ Tests starting up shill while a user is already logged in.306 The "--push" argument should not be added even though shill is started307 while a user is logged in.308 """309 os.mkdir('/var/run/shill')310 os.mkdir('/var/run/shill/user_profiles')311 self.create_new_shill_user_profile('')312 os.symlink(self.new_shill_user_profile_dir,313 '/var/run/shill/user_profiles/chronos')314 self.touch('/var/run/state/logged-in')315 self.start_shill()316 self.assure('--push=~chronos/shill' not in self.get_commandline(),317 'Shill command line does not contain push argument')318 os.unlink('/var/run/state/logged-in')319 def test_start_port_flimflam_profile(self):320 """ Test that we can port a flimflam profile to a new shill profile.321 Startup should move an old flimflam profile into place if a shill322 profile does not already exist.323 """324 os.mkdir('/var/cache/flimflam')325 flimflam_profile = '/var/cache/flimflam/default.profile'326 self.create_file_with_contents(flimflam_profile, self.magic_header)327 shill_profile = '/var/cache/shill/default.profile'328 self.start_shill()329 self.assure(not os.path.exists(flimflam_profile),330 'Flimflam profile no longer exists')331 self.assure(os.path.exists(shill_profile),332 'Shill profile exists')333 self.assure(self.magic_header in self.file_contents(shill_profile),334 'Shill default profile contains our magic header')335 def test_start_ignore_flimflam_profile(self):336 """ Test that we ignore a flimflam profile if a new profile exists.337 Startup should ignore an old flimflam profile if a shill profile338 already exists.339 """340 os.mkdir('/var/cache/flimflam')341 os.mkdir('/var/cache/shill')342 flimflam_profile = '/var/cache/flimflam/default.profile'343 self.create_file_with_contents(flimflam_profile, self.magic_header)344 shill_profile = '/var/cache/shill/default.profile'345 self.touch(shill_profile)346 self.start_shill()347 self.assure(os.path.exists(flimflam_profile),348 'Flimflam profile still exists')349 self.assure(self.magic_header not in self.file_contents(shill_profile),350 'Shill default profile does not contain our magic header')351 def test_login(self):352 """ Test the login process.353 Login should create a profile directory, then create and push354 a user profile, given no previous state.355 """356 os.mkdir('/var/run/shill')357 self.login()358 self.assure(not os.path.exists(self.flimflam_user_profile),359 'Flimflam user profile does not exist')360 self.assure(not os.path.exists(self.old_shill_user_profile),361 'Old shill user profile does not exist')362 self.assure(not os.path.exists(self.new_shill_user_profile),363 'New shill user profile does not exist')364 # The DBus "CreateProfile" method should have been handled365 # by our mock_flimflam instance, so the profile directory366 # should not have actually been created.367 self.assure_is_dir(self.new_shill_user_profile_dir,368 'New shill user profile directory')369 self.assure_is_dir('/var/run/shill/user_profiles',370 'Shill profile root')371 self.assure_is_link_to('/var/run/shill/user_profiles/chronos',372 self.new_shill_user_profile_dir,373 'Shill profile link')374 self.assure_is_dir(self.user_cryptohome_log_dir,375 'shill user log directory')376 self.assure_is_link_to('/var/run/shill/log',377 self.user_cryptohome_log_dir,378 'Shill logs link')379 self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ],380 [ 'InsertUserProfile',381 ('~chronos/shill', self.fake_user_hash) ]],382 'CreateProfile and InsertUserProfile '383 'are called')384 def test_login_guest(self):385 """ Tests the guest login process.386 Login should create a temporary profile directory in /var/run,387 instead of using one within the root directory for normal users.388 """389 os.mkdir('/var/run/shill')390 self.login_guest()391 self.assure(not os.path.exists(self.flimflam_user_profile),392 'Flimflam user profile does not exist')393 self.assure(not os.path.exists(self.old_shill_user_profile),394 'Old shill user profile does not exist')395 self.assure(not os.path.exists(self.new_shill_user_profile),396 'New shill user profile does not exist')397 self.assure(not os.path.exists(self.new_shill_user_profile_dir),398 'New shill user profile directory')399 self.assure_is_dir(self.guest_shill_user_profile_dir,400 'shill guest user profile directory')401 self.assure_is_dir('/var/run/shill/user_profiles',402 'Shill profile root')403 self.assure_is_link_to('/var/run/shill/user_profiles/chronos',404 self.guest_shill_user_profile_dir,405 'Shill profile link')406 self.assure_is_dir(self.guest_shill_user_log_dir,407 'shill guest user log directory')408 self.assure_is_link_to('/var/run/shill/log',409 self.guest_shill_user_log_dir,410 'Shill logs link')411 self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ],412 [ 'InsertUserProfile',413 ('~chronos/shill', '') ]],414 'CreateProfile and InsertUserProfile '415 'are called')416 def test_login_profile_exists(self):417 """ Test logging in a user whose profile already exists.418 Login script should only push (and not create) the user profile419 if a user profile already exists.420 """421 os.mkdir('/var/run/shill')422 os.mkdir(self.new_shill_user_profile_dir)423 self.touch(self.new_shill_user_profile)424 self.login()425 self.assure_method_calls([[ 'InsertUserProfile',426 ('~chronos/shill', self.fake_user_hash) ]],427 'Only InsertUserProfile is called')428 def test_login_old_shill_profile(self):429 """ Test logging in a user with an old-style shill profile.430 Login script should move an old shill user profile into place431 if a new one does not exist.432 """433 os.mkdir('/var/run/shill')434 self.create_old_shill_user_profile(self.magic_header)435 self.login()436 self.assure(not os.path.exists(self.old_shill_user_profile),437 'Old shill user profile no longer exists')438 self.assure(not os.path.exists(self.old_shill_user_profile_dir),439 'Old shill user profile directory no longer exists')440 self.assure_exists(self.new_shill_user_profile,441 'New shill profile')442 self.assure(self.magic_header in443 self.file_contents(self.new_shill_user_profile),444 'Shill user profile contains our magic header')445 self.assure_method_calls([[ 'InsertUserProfile',446 ('~chronos/shill', self.fake_user_hash) ]],447 'Only InsertUserProfile is called')448 def make_symlink(self, path):449 """ Create a symbolic link named |path|.450 @param path string pathname of the symbolic link.451 """452 os.symlink('/etc/hosts', path)453 def make_special_file(self, path):454 """ Create a special file named |path|.455 @param path string pathname of the special file.456 """457 os.mknod(path, stat.S_IFIFO)458 def make_bad_owner(self, path):459 """ Create a regular file with a strange ownership.460 @param path string pathname of the file.461 """462 self.touch(path)463 os.lchown(path, 1000, 1000)464 def test_login_invalid_old_shill_profile(self):465 """ Test logging in with an invalid old-style shill profile.466 Login script should ignore non-regular files or files not owned467 by the correct user. The original file should be removed.468 """469 os.mkdir('/var/run/shill')470 for file_creation_method in (self.make_symlink,471 self.make_special_file,472 os.mkdir,473 self.make_bad_owner):474 os.mkdir(self.old_shill_user_profile_dir)475 file_creation_method(self.old_shill_user_profile)476 self.login()477 self.assure(not os.path.exists(self.old_shill_user_profile),478 'Old shill user profile no longer exists')479 self.assure(not os.path.exists(self.old_shill_user_profile_dir),480 'Old shill user profile directory no longer exists')481 self.assure(not os.path.exists(self.new_shill_user_profile),482 'New shill profile was not created')483 self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ],484 [ 'InsertUserProfile',485 ('~chronos/shill',486 self.fake_user_hash) ]],487 'CreateProfile and InsertUserProfile '488 'are called')489 os.unlink('/var/run/shill/user_profiles/chronos')490 def test_login_ignore_old_shill_profile(self):491 """ Test logging in with both an old and new profile present.492 Login script should ignore an old shill user profile if a new one493 exists.494 """495 os.mkdir('/var/run/shill')496 self.create_new_shill_user_profile('')497 self.create_old_shill_user_profile(self.magic_header)498 self.login()499 self.assure(os.path.exists(self.old_shill_user_profile),500 'Old shill user profile still exists')501 self.assure_exists(self.new_shill_user_profile,502 'New shill profile')503 self.assure(self.magic_header not in504 self.file_contents(self.new_shill_user_profile),505 'Shill user profile does not contain our magic header')506 self.assure_method_calls([[ 'InsertUserProfile',507 ('~chronos/shill', self.fake_user_hash) ]],508 'Only InsertUserProfile is called')509 def test_login_flimflam_profile(self):510 """ Test logging in with an old flimflam profile.511 Login script should move a flimflam user profile into place512 if a shill one does not exist.513 """514 os.mkdir('/var/run/shill')515 self.create_flimflam_user_profile(self.magic_header)516 self.login()517 self.assure(not os.path.exists(self.flimflam_user_profile),518 'Flimflam user profile no longer exists')519 self.assure(not os.path.exists(self.flimflam_user_profile_dir),520 'Flimflam user profile directory no longer exists')521 self.assure_exists(self.new_shill_user_profile,522 'New shill profile')523 self.assure(self.magic_header in524 self.file_contents(self.new_shill_user_profile),525 'Shill user profile contains our magic header')526 self.assure_method_calls([[ 'InsertUserProfile',527 ('~chronos/shill', self.fake_user_hash) ]],528 'Only InsertUserProfile is called')529 def test_login_ignore_flimflam_profile(self):530 """ Test logging in with both a flimflam profile and a new profile.531 Login script should ignore an old flimflam user profile if a new532 one exists.533 """534 os.mkdir('/var/run/shill')535 self.create_flimflam_user_profile(self.magic_header)536 self.create_new_shill_user_profile('')537 self.login()538 self.assure_exists(self.new_shill_user_profile,539 'New shill profile')540 self.assure(self.magic_header not in541 self.file_contents(self.new_shill_user_profile),542 'Shill user profile does not contain our magic header')543 self.assure_method_calls([[ 'InsertUserProfile',544 ('~chronos/shill', self.fake_user_hash) ]],545 'Only InsertUserProfile is called')546 def test_login_prefer_old_shill_profile(self):547 """ Test logging in with both a flimflam and old-style shill profile.548 Login script should use the old shill user profile in preference549 to a flimflam user profile if the new user profile does not550 exist.551 """552 os.mkdir('/var/run/shill')553 self.create_flimflam_user_profile('')554 self.create_old_shill_user_profile(self.magic_header)555 self.login()556 self.assure(not os.path.exists(self.flimflam_user_profile),557 'Flimflam user profile was removed')558 self.assure(not os.path.exists(self.old_shill_user_profile),559 'Old shill user profile no longer exists')560 self.assure_exists(self.new_shill_user_profile,561 'New shill profile')562 self.assure(self.magic_header in563 self.file_contents(self.new_shill_user_profile),564 'Shill user profile contains our magic header')565 self.assure_method_calls([[ 'InsertUserProfile',566 ('~chronos/shill', self.fake_user_hash) ]],567 'Only InsertUserProfile is called')568 def test_login_multi_profile(self):569 """ Test signalling shill about multiple logged-in users.570 Login script should not create multiple profiles in parallel571 if called more than once without an intervening logout. Only572 the initial user profile should be created.573 """574 os.mkdir('/var/run/shill')575 self.create_new_shill_user_profile('')576 # First logged-in user should create a profile (tested above).577 self.login()578 # Clear the mock method-call queue.579 self.mock_flimflam.get_method_calls()580 for attempt in range(5):581 self.login()582 self.assure_method_calls([], 'No more profiles are added to shill')583 profile_links = os.listdir('/var/run/shill/user_profiles')584 self.assure(len(profile_links) == 1, 'Only one profile exists')585 self.assure(profile_links[0] == 'chronos',586 'The profile link is for the chronos user')587 self.assure_is_link_to('/var/run/shill/log',588 self.user_cryptohome_log_dir,589 'Shill log link for chronos')590 def test_logout(self):591 """ Test the logout process. """592 os.makedirs('/var/run/shill/user_profiles')593 os.makedirs(self.guest_shill_user_profile_dir)594 os.makedirs(self.guest_shill_user_log_dir)595 self.touch('/var/run/state/logged-in')596 self.logout()597 self.assure(not os.path.exists('/var/run/shill/user_profiles'),598 'User profile directory was removed')599 self.assure(not os.path.exists(self.guest_shill_user_profile_dir),600 'Guest user profile directory was removed')601 self.assure(not os.path.exists(self.guest_shill_user_log_dir),602 'Guest user log directory was removed')603 self.assure_method_calls([[ 'PopAllUserProfiles', '' ]],...
scoring_function.py
Source:scoring_function.py
1from pyspark.sql.functions import *2from pyspark.sql.types import *3from IPython.display import Image4from matplotlib import pyplot as plt5from datetime import datetime 6from datetime import timedelta 7import pandas as pd8import seaborn as sns9from math import ceil,floor10from math import fabs11import time12import datetime13import copy14def scoring(df,params,params_poids_score) :15 def udf_contrat(list_of_sin) :16 score = 017 copy_l_sin = list_of_sin18 copy_l_sin = []19 crit_contrat = 020 inf = ''21 if list_of_sin != None :22 23 # reprocess la table24 for ref in list_of_sin :25 new_reference = ref[0]26 new_date = datetime.datetime(int(ref[3]),int(ref[2]),int(ref[1]))27 new_garantie = ref[4]28 new_numconuti = ref[5]29 new_nom_assure = ref[6]30 new_prenom_assure = ref[7]31 new_date_naiss_assure = ref[8] 32 new_ref_sin = {'reference' : new_reference,\33 'date' : new_date,\34 'garantie' : new_garantie,\35 'numconuti':new_numconuti,\36 'nom_assure':new_nom_assure,\37 'prenom_assure':new_prenom_assure,\38 'date_naiss_assure':new_date_naiss_assure}39 40 copy_l_sin.append(new_ref_sin)41 42 # dossier en cours43 ref_dossier_en_cours = copy_l_sin[0]44 # OK dossiers en double45 while ref_dossier_en_cours in copy_l_sin:46 copy_l_sin.remove(ref_dossier_en_cours)47 # OK analyser les dossiers précédent le sinistre48 to_remove = []49 for ref in copy_l_sin:50 if ref_dossier_en_cours['date'] < ref['date']:51 to_remove.append(ref)52 # OK remove les dossiers après le sinistre53 for i in to_remove :54 copy_l_sin.remove(i)55 56 57 # Analyser s'il existe plusieurs contrats par assuré :58 to_remove = []59 for ref in copy_l_sin :60 if ref['numconuti'] == ref_dossier_en_cours['numconuti'] and ref['nom_assure'] == ref_dossier_en_cours['nom_assure'] and \61 ref['prenom_assure'] == ref_dossier_en_cours['prenom_assure'] and ref['date_naiss_assure'] == ref_dossier_en_cours["date_naiss_assure"] :62 to_remove.append(ref)63 64 # Remove the correct information65 for i in to_remove :66 copy_l_sin.remove(i)67 68 # Identify duplicate numconuti69 duplicate = []70 to_remove = []71 for ref in copy_l_sin :72 if ref['numconuti'] not in duplicate :73 duplicate.append(ref['numconuti'])74 else:75 to_remove.append(ref)76 77 # Remove duplicate contracts78 for i in to_remove :79 copy_l_sin.remove(i)80 81 if len(copy_l_sin) > 0 :82 inf = inf + 'Score + 1 : Plusieurs contrat pour l assuré concerné' + '\n'83 84 for ref in copy_l_sin :85 date_format = ref['date'].strftime('%d-%m-%Y')86 inf = inf + ' Contrat : ' + str(ref['numconuti']) + ' | Rattaché au sinistre : ' + str(ref['reference']) + ' - Garantie : ' + str(ref['garantie']) + ' - Le : ' + str(date_format) + '\n' 87 inf = inf +'\n'88 89 return [crit_contrat,inf]90 91 92 def udf_periodicite(list_of_sin) :93 score =094 copy_l_sin = list_of_sin95 copy_l_sin = []96 crit_period = 097 crit_period_1_4 = False98 inf = ''99 if list_of_sin != None :100 101 # reprocess la table102 for ref in list_of_sin :103 new_reference = ref[0]104 new_date = datetime.datetime(int(ref[3]),int(ref[2]),int(ref[1]))105 new_garantie = ref[4]106 107 new_ref_sin = {'reference' : new_reference,\108 'date' : new_date,\109 'garantie' : new_garantie}110 111 copy_l_sin.append(new_ref_sin)112 # dossier en cours113 ref_dossier_en_cours = copy_l_sin[0]114 # dossiers en double115 while ref_dossier_en_cours in copy_l_sin:116 copy_l_sin.remove(ref_dossier_en_cours)117 # analyser les dossiers précédent le sinistre118 to_remove = []119 for ref in copy_l_sin:120 if ref_dossier_en_cours['date'] < ref['date']:121 to_remove.append(ref)122 # remove les dossiers après le sinistre123 for i in to_remove :124 copy_l_sin.remove(i)125 # analyser avec les critères de fraude sur la périodicité126 crit_period_4 = 0127 inf = ''128 l_period_4 = []129 for i,ref in enumerate(copy_l_sin) :130 if (fabs((ref['date'] - ref_dossier_en_cours['date']).days) -1)%365 -1 <= params['delai_periodicite'] : 131 132 # d_2 >= d_1133 if (fabs((ref['date'] - ref_dossier_en_cours['date']).days) -1)%365 -1 <= params['delai_periodicite'] and \134 (fabs((ref['date'] - ref_dossier_en_cours['date']).days) -1)>=365 :135 l_period_4.append(ref)136 else :137 # d_1 > d_2138 if fabs((fabs((ref['date'] - ref_dossier_en_cours['date']).days) -1)%365 -1 - 365) <= params['delai_periodicite'] and \139 (fabs((ref['date'] - ref_dossier_en_cours['date']).days) -1)>=365 :140 l_period_4.append(ref)141 142 143 # sort datetime in descending order144 # 1. ascending order145 list_date = [i['date'] for i in l_period_4]146 list_date = sorted(list_date)147 148 # 2. descending order149 list_date = list_date[::-1]150 151 l_period = []152 153 for i in list_date : 154 for j in l_period_4 :155 if i == j['date'] and j not in l_period : 156 l_period.append(j)157 break158 159 l_period_4 = l_period160 161 for i,ref in enumerate(l_period_4) :162 date_format = ref['date'].strftime('%d-%m-%Y')163 if i==0 :164 inf = inf + '\n'165 crit_period_4 = len(l_period_4)166 crit_period_1_4 = True167 inf = inf + 'Score + ' + str(params_poids_score['crit_periodicite_p_4']) + ' : Déclaration sinistre proche' + '\n'168 inf = inf + ' année antérieure / postérieure' + '\n'169 inf = inf + ' Sinistre : ' + str(ref['reference']) + ' | ' + str(ref['garantie']) + ' | ' + str(date_format) + '\n'170 else :171 inf = inf + ' Sinistre : ' + str(ref['reference']) + ' | ' + str(ref['garantie']) + ' | ' + str(date_format) + '\n'172 173 174 inf = inf + '\n'175 crit_period = crit_period_4 176 return [crit_period,inf,crit_period_1_4]177 178 df_bis = df179 score = 0180 181 detail_general = [lit('Information Générales:'),lit('\n'),lit('\n'),lit('Numéro de sinistre : '),col("numutipre"),lit('\n'),\182 lit('Numéro de contrat : '),col("numconuti"),lit('\n'),\183 lit('Date de survenance : '),date_format(col("datsursin"), "dd-MM-YYYY"),lit('\n'),\184 lit('Type de sinistre : '),col("libgarele"),lit('\n'),\185 lit('Nom assure : '),col("nom_assure"),lit('\n'),\186 lit('Prenom assure : '),col("prenom_assure"),lit('\n'),\187 lit('\n')]188 189 df_bis = df_bis.withColumn('detail_general',concat(*detail_general))190 191 # periodicité192 df_bis = df_bis.withColumn('day_dtsurv',dayofmonth('datsursin'))193 df_bis = df_bis.withColumn('month_dtsurv',month('datsursin'))194 df_bis = df_bis.withColumn('year_dtsurv',year('datsursin'))195 196 df_bis = df_bis.withColumn('numutipre_datsursin',array([df_bis.numutipre,df_bis.day_dtsurv,df_bis.month_dtsurv,df_bis.year_dtsurv,df_bis.codgarele]))197 198 df_periodicite = df_bis.groupBy('nom_assure','prenom_assure','date_naiss_assure').agg(collect_set('numutipre_datsursin').alias('sin_assure_'))199 df_periodicite = df_periodicite.withColumn("nb_sin_assure", size("sin_assure_"))200 df_bis = df_bis.join(df_periodicite,['nom_assure','prenom_assure','date_naiss_assure'],'left')201 202 df_bis = df_bis.withColumn('numutipre_datsursin_init',array([df_bis.numutipre_datsursin]))203 df_bis = df_bis.withColumn("sin_assure", concat(col("numutipre_datsursin_init"), col("sin_assure_")))204 205 periodicite = udf(lambda l: udf_periodicite(l),ArrayType(StringType()))206 df_bis = df_bis.withColumn('crit_period_',periodicite(col('sin_assure')))207 208 df_bis = df_bis.withColumn('crit_period',col('crit_period_')[0])209 df_bis = df_bis.withColumn('crit_period_detail',col('crit_period_')[1])210 df_bis = df_bis.withColumn('crit_period_periodicite',col('crit_period_')[2])211 212 df_bis = df_bis.drop('crit_period_','sin_assure')213 214 # multiplicité contrat215 df_bis = df_bis.withColumn('numutipre_datsursin_numconuti',\216 array([df_bis.numutipre,df_bis.day_dtsurv,df_bis.month_dtsurv,df_bis.year_dtsurv,df_bis.codgarele,\217 df_bis.numconuti,df_bis.nom_assure,df_bis.prenom_assure,df_bis.date_naiss_assure]))218 219 df_fraud_2 = df_bis.groupBy('nom_assure','prenom_assure','date_naiss_assure').agg(collect_set('numutipre_datsursin_numconuti').alias('fraud_2'))220 df_bis = df_bis.join(df_fraud_2,['nom_assure','prenom_assure','date_naiss_assure'],'left')221 df_bis = df_bis.withColumn('numutipre_datsursin_numconuti_init',array([df_bis.numutipre_datsursin_numconuti]))222 df_bis = df_bis.withColumn("fraud_contrat", concat(col("numutipre_datsursin_numconuti_init"), col("fraud_2")))223 224 # nom de la udf225 fraud_contrat_udf = udf(lambda l: udf_contrat(l),ArrayType(StringType()))226 df_bis = df_bis.withColumn('crit_contrat_',fraud_contrat_udf(col('fraud_contrat')))227 228 df_bis = df_bis.withColumn('crit_contrat', col('crit_contrat_')[0])229 df_bis = df_bis.withColumn('crit_contrat_detail',col('crit_contrat_')[1])230 231 df_mul_ctr = df_bis.groupBy('nom_assure','prenom_assure','date_naiss_assure').agg(collect_set('numconuti').alias('ctr_assure'))232 df_mul_ctr = df_mul_ctr.withColumn("nb_ctr_assure", size("ctr_assure"))233 234 df_bis = df_bis.join(df_mul_ctr,['nom_assure','prenom_assure','date_naiss_assure'],'left')235 df_bis = df_bis.withColumn('crit_multiplicite_contrat',col('nb_ctr_assure'))236 237 # Délai entre date effet de contrat et datsursin238 239 df_bis = df_bis.withColumn('delai_effcon_sinistre',datediff("date_effet","datsursin"))240 df_bis = df_bis.withColumn('delai_modcon_sinistre',datediff("dtdermod","datsursin"))241 242 243 cond_dteffcon = when(col('delai_effcon_sinistre') < params['delai_eff_con'],lit(params_poids_score['crit_delai_eff_con'])).otherwise(lit(0))244 cond_dtmodcon = when(col('delai_modcon_sinistre') < params['delai_mod_con'],lit(params_poids_score['crit_delai_modcon'])).otherwise(lit(0))245 246 df_bis = df_bis.withColumn('crit_effcon',cond_dteffcon)247 df_bis = df_bis.withColumn('crit_modcon',cond_dtmodcon)248 249 # Nombre d'assure250 251 cond_dteffcon = when(col('nb_assure') < params['nb_assure']\252 ,lit(params_poids_score['crit_nb_assure']))\253 .otherwise(lit(0)) 254 df_bis = df_bis.withColumn('crit_nb_assure',cond_dteffcon)255 256 # score final257 df_bis = df_bis.withColumn('score',col('crit_multiplicite_contrat') + col('crit_period') + col('crit_contrat'))258 259 df_bis = df_bis.withColumn('score' + '_detail',concat(col('detail_general'),col('crit_contrat'+ '_detail') , col('crit_period'+ '_detail')))260 261 df_bis = df_bis.drop('ctr_assure','day_dtsurv','month_dtsurv','year_dtsurv','numutipre_datsursin','sin_assure_','numutipre_datsursin_init'\262 'detail_general','crit_multiplicite_contrat'+ '_detail','crit_contrat'+ '_detail','crit_period'+ '_detail',\263 'numutipre_datsursin_numconuti','numutipre_datsursin_numconuti_init','fraud_contrat','fraud_2','crit_contrat_','nb_ctr_assure',\264 'numutipre_datsursin_init','nb_sin_assure','crit_contrat','detail_general')265 266 ...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!